Commit c3ddcf2e by 孙龙

up

parents
/logs
# ide
.idea
/config/*.toml
*.properties
*.exe
*.exe~
gowatch.yml
\ No newline at end of file
# golang-asynctask_demo
## 如何使用该项目 :
#### 该示例的项目名称为 : go-supplier-sync , 如果你要使用,请自行替换成你的项目名称
### 1.clone项目
git clone ssh://git@119.23.72.7:22611/sunlong_v5/golang-asynctask_demo.git
### 2.复制项目
1.将golang-asynctask_demo复制一份成你的自己的项目,比如go-supplier-sync
2.删除.git文件夹,重新初始化git
### 3.修改项目文件夹名称
1.将 golang-asynctask_demo 改成 go-supplier-sync
2.编辑器批量修改包名 : golang-asynctask_demo/ => go-supplier-sync/
### 4.修改go.mod文件(重要)
一定要修改go.mod文件,要不然无法跑起来
把go.mod的第一行 : module go-supplier-sync 修改成 module go-supplier-sync
### 5.修改http监听地址
config/proxy.toml.http.addr 修改成你的ip地址或者本地127.0.0.1
### 6.项目启动
go run ./cmd/main.go
go run ./cmd/main.go -config=./config/dev/ -logdir = ./logs/
##项目的目录结构
├── app 项目
│ ├── common 公共文件
│ ├── controller 业务逻辑访问入口
│ ├── dao 数据层操作(对mysql 等数据操作)
│ ├── model 数据层模型 (数据库表定义)
│ └── service 处理业务逻辑
├── boot 启动文件
├── cmd 入口
├── config 配置
├── document
├── util 包管理
├── middleware 中间件
├── router 路由
├── resources
├── go.mod
参考文献:
xorm:https://www.kancloud.cn/xormplus/xorm/413729
go-redis-pool https://github.com/ichunt2019/go-redis-pool
package config
import (
"github.com/ichunt2019/cfg/lib"
)
type BaseDatabase struct {
DataSourceName string
MaxIdleCons int
MaxOpenCons int
Prefix string
}
type GroupDatabase struct {
DataSourceName []string
MaxIdleCons int
MaxOpenCons int
Prefix string
}
//单、主数据 master
func BuildDatabaseList() (DatabaseList map[string]BaseDatabase) {
return map[string]BaseDatabase{
"crm": {
DataSourceName: lib.Instance("db").GetString("crm.data_source_name"),
Prefix: lib.Instance("db").GetString("crm.table_prefix"),
MaxIdleCons:lib.Instance("db").GetInt("crm.max_idle_conn"),
MaxOpenCons:lib.Instance("db").GetInt("crm.max_open_conn"),
},
}
}
//主从数据 master slave
func BuildGroupDatabaseList() (DatabaseList map[string]GroupDatabase) {
return map[string]GroupDatabase{
//"liexin": {
// DataSourceName:lib.Instance("db").GetStringSlice("liexin.data_source_name"),
// Prefix: lib.Instance("db").GetString("liexin.table_prefix"),
// MaxIdleCons:lib.Instance("db").GetInt("liexin.max_idle_conn"),
// MaxOpenCons:lib.Instance("db").GetInt("liexin.max_open_conn"),
//},
}
}
package config
import (
"github.com/ichunt2019/cfg/lib"
"time"
)
type RedisGroupDatabase struct {
MasterHost string
Password string //master密码
SlaveHost []string
ReadonlyPassword string //从服务器密码
DialTimeout time.Duration //连接超时
MinIdleConns int //空间链接
ReadTimeout time.Duration //读超时
WriteTimeout time.Duration //写超时
}
//多数据库配置
func BuildRedisGroupConfigs() ( map[string]RedisGroupDatabase) {
//fmt.Println("6666666666")
//fmt.Println(lib.Instance("redis").GetStringSlice("api.slave.host"))
return map[string]RedisGroupDatabase{
"api": {
MasterHost: lib.Instance("redis").GetString("api.master.host"),
Password:lib.Instance("redis").GetString("api.master.password"),
SlaveHost: lib.Instance("redis").GetStringSlice("api.slave.host"),
ReadonlyPassword:lib.Instance("redis").GetString("api.slave.password"),
DialTimeout: time.Duration(lib.Instance("redis").GetInt("api.dial_timeout"))*time.Second,
ReadTimeout: time.Duration(lib.Instance("redis").GetInt("api.read_timeout"))*time.Second,
WriteTimeout: time.Duration(lib.Instance("redis").GetInt("api.write_timeout"))*time.Second,
MinIdleConns:lib.Instance("redis").GetInt("api.min_idle_conns"),
},
"sku": {
MasterHost: lib.Instance("redis").GetString("sku.master.host"),
Password:lib.Instance("redis").GetString("sku.master.password"),
SlaveHost: lib.Instance("redis").GetStringSlice("sku.slave.host"),
ReadonlyPassword:lib.Instance("redis").GetString("sku.slave.password"),
DialTimeout: time.Duration(lib.Instance("redis").GetInt("sku.dial_timeout"))*time.Second,
ReadTimeout: time.Duration(lib.Instance("redis").GetInt("sku.read_timeout"))*time.Second,
WriteTimeout: time.Duration(lib.Instance("redis").GetInt("sku.write_timeout"))*time.Second,
MinIdleConns:lib.Instance("redis").GetInt("sku.min_idle_conns"),
},
}
}
package common
import (
"github.com/gin-gonic/gin"
"strings"
)
type RecommendRequest struct {
GoodsName string `form:"goods_name"`
Attrs string `form:"attrs"`
Encap string `form:"encap"`
Num int `form:"num"`
DeliveryType int `form:"delivery_type"`
Flag int `form:"flag"`
BrandName string `form:"brand_name"`
}
//获取所有请求参数放到字典里面
func GetAllRequestParams(c *gin.Context) (request map[string]string){
request = make(map[string]string)
c.MultipartForm()
for name, value := range c.Request.Form {
if value[0] != "" {
request[name] = strings.TrimSpace(value[0])
}
}
return
}
package common
import (
"github.com/gin-gonic/gin"
"strings"
)
type Response struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
Data interface{} `json:"data"`
}
type BomResponse struct {
ErrCode int `json:"error_code"`
ErrMsg string `json:"error_msg"`
Flag int `json:"flag"`
Total int `json:"total"`
Data interface{} `json:"data"`
}
func SuccessResponse(errCode int, errMsg string, data interface{}) Response {
return Response{
ErrCode: errCode,
ErrMsg: errMsg,
Data: data,
}
}
func ErrorResponse(errCode int, errMsg string) Response {
return Response{
ErrCode: errCode,
ErrMsg: errMsg,
Data: []string{},
}
}
//统一输出,里面还要去处理jsonp
func Output(ctx *gin.Context,errCode int, errMsg string, data interface{}) {
if data == nil {
data = []string{}
}
response := Response{
ErrCode: errCode,
ErrMsg: errMsg,
Data: data,
}
//if errCode >= 100 {
// //SearchApiLogger(ctx,errCode, errMsg)
//}
if ctx.DefaultQuery("callback", "") != "" {
ctx.JSONP(200, response)
} else {
referer := ctx.Request.Referer()
referer = strings.TrimRight(referer, "/")
ctx.Header("Access-Control-Allow-Origin", referer)
ctx.Header("Access-Control-Allow-Credentials", "true")
//允许跨站访问的站点域名
//跨域请求头设置
ctx.JSON(200, response)
}
}
//简单的返回数据方法
func ReturnData(ctx *gin.Context,errCode int, errMsg string, data interface{}) {
if data == nil {
data = []string{}
if errCode == 0 {
errCode = 1
}
}
response := Response{
ErrCode: errCode,
ErrMsg: errMsg,
Data: data,
}
ctx.JSON(200, response)
}
package common
import (
"reflect"
"sort"
)
func getCommon(array interface{}) (reflect.Type, reflect.Value, int) {
t := reflect.TypeOf(array)
v := reflect.ValueOf(array)
l := v.Len()
return t, v, l
}
func SortSlice(array interface{}) {
t, v, _ := getCommon(array)
// res := make([]interface{}, l)
if t.Kind() == reflect.Slice {
switch v.Index(0).Kind() {
case reflect.Int:
array := array.([]int)
sort.Ints(array)
case reflect.String:
array := array.([]string)
sort.Strings(array)
case reflect.Float64:
array := array.([]float64)
sort.Float64s(array)
default:
panic("the param can only be int/string/float64 array")
}
} else {
panic("expects parameter 1 to be array")
}
}
package controller
import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/ichunt2019/cfg/lib"
xlog "github.com/ichunt2019/lxLog/log"
common "golang-asynctask/app/common/function"
"golang-asynctask/app/service"
)
func Ping(ctx *gin.Context) {
xlog.Instance("sku").Info("ping")
service.GetData()
common.Output(ctx, 0, "success",nil)
}
func Hbsdata(ctx *gin.Context) {
xlog.Instance("spu").Info("Hbsdata")
fmt.Println(lib.Instance("db").GetString("liexin.max_open_conn"))
fmt.Println(lib.Instance("db").GetStringSlice("liexin.data_source_name"))
//b := lib.Instance("db").GetStringSlice("liexin.slave.data_source_name")
//fmt.Println(b)
//fmt.Println(reflect.TypeOf(b))
common.Output(ctx, 0, "success",nil)
}
\ No newline at end of file
#dao
\ No newline at end of file
package crm
import (
"errors"
"fmt"
"github.com/tidwall/gjson"
"golang-asynctask/util/lib"
"net/http"
"net/url"
"strconv"
cfg "github.com/ichunt2019/cfg/lib"
)
type comUser struct {
ComId int64 `json:com_id`
UserId int64 `json:user_id`
SaleId int64 `json:sale_id`
}
func PushComUserInfoToErp(data string) (err error){
pushData := comUser{}
table := gjson.Parse(data).Get("table").String()
action_type := gjson.Parse(data).Get("type").String()
//fmt.Println(action_type)
//action_type = "delete"
if action_type != "update" && action_type != "insert"{
return nil
}
if table == "lie_invoice_company"{
pushData.ComId = gjson.Parse(data).Get("data.id").Int()
}else if(table == "lie_invoice_com_user"){
pushData.ComId = gjson.Parse(data).Get("data.com_id").Int()
pushData.UserId = gjson.Parse(data).Get("data.user_id").Int()
pushData.SaleId = gjson.Parse(data).Get("data.sale_id").Int()
fmt.Println("888")
if pushData.ComId == 0{
return nil
}
}else{
return nil
}
//fmt.Println("55555555555")
urlParams := url.Values{}
urlParams.Add("com_id",strconv.FormatInt(pushData.ComId,10))
urlParams.Add("user_id",strconv.FormatInt(pushData.UserId,10))
urlParams.Add("sale_id",strconv.FormatInt(pushData.SaleId,10))
//fmt.Println(urlParams)
header := http.Header{}
header.Set("api-key",cfg.Instance("config").GetString("apiKey"))
header.Set("Content-Type","application/json")
push_url := cfg.Instance("config").GetString("pushErpDomain")
response,returnData,err := lib.HttpPOST(push_url,urlParams,0,header,"")
//fmt.Println(response,string(returnData),err)
if err == nil && response.StatusCode == 200{
err_code := gjson.Parse(string(returnData)).Get("errcode").Int()
msg := gjson.Parse(string(returnData)).Get("errmsg").String()
if err_code != 0{
err = errors.New(msg)
}
}
//fmt.Println("err",err)
return err
}
package dao
import (
"sync"
"time"
"xorm.io/xorm"
"github.com/go-redis/redis/v7"
_ "github.com/go-sql-driver/mysql"
"github.com/ichunt2019/cfg/lib"
redisPool "github.com/ichunt2019/go-redis-pool"
"golang-asynctask/app/common/config"
)
var (
once sync.Once
Dao *dao
)
type dao struct {
db map[string]*xorm.Engine //非主从mysql数据库 连接池
dbGroup map[string]*xorm.EngineGroup //mysql主从 连接池
redisGroup map[string]*redisPool.Pool //redis 主从 连接池
}
//获取db实例
func (self *dao) GetDb(databases string) *xorm.Engine {
return self.db[databases]
}
//获取主从db实例
//获取主从db实例
func (self *dao) GetDbGroup(databases string) *xorm.EngineGroup {
return self.dbGroup[databases]
}
//获取主从db实例
func (self *dao) GetRedisDbGroup(databases string) *redisPool.Pool {
return self.redisGroup[databases]
}
func mysqlSetup(d *dao) *dao {
var (
err error
)
DatabaseList := config.BuildDatabaseList()
GroupDatabaseList := config.BuildGroupDatabaseList()
if len(DatabaseList) > 0 {
for conName, db := range DatabaseList {
d.db[conName], err = xorm.NewEngine("mysql", db.DataSourceName)
if err != nil {
panic(err)
}
//日志打印SQL
ShowSql := lib.Instance("db").GetBool("xorm.ShowSQL")
d.db[conName].ShowSQL(ShowSql)
//设置连接池的空闲数大小
d.db[conName].SetMaxIdleConns(db.MaxIdleCons)
//设置最大打开连接数
d.db[conName].SetMaxOpenConns(db.MaxOpenCons)
}
}
if len(GroupDatabaseList) > 0 {
for conName, db := range GroupDatabaseList {
d.dbGroup[conName], err = xorm.NewEngineGroup("mysql", db.DataSourceName)
if err != nil {
panic(err)
}
//日志打印SQL
ShowSql := lib.Instance("db").GetBool("xorm.ShowSQL")
d.dbGroup[conName].ShowSQL(ShowSql)
//设置连接池的空闲数大小
d.dbGroup[conName].SetMaxIdleConns(db.MaxIdleCons)
//设置最大打开连接数
d.dbGroup[conName].SetMaxOpenConns(db.MaxOpenCons)
}
}
return d
}
func redisSetup(d *dao) *dao {
var err error
redisGroupList := config.BuildRedisGroupConfigs()
//fmt.Println(redisGroupList)
for redisServerName, redisInfo := range redisGroupList {
d.redisGroup[redisServerName], err = redisPool.NewHA(&redisPool.HAConfig{
Master: redisInfo.MasterHost,
Slaves: redisInfo.SlaveHost,
Password: redisInfo.Password,
ReadonlyPassword: redisInfo.ReadonlyPassword,
Options: &redis.Options{
DialTimeout: redisInfo.DialTimeout, //连接超时
MinIdleConns: redisInfo.MinIdleConns, //空闲链接数
ReadTimeout: redisInfo.ReadTimeout,
WriteTimeout: redisInfo.WriteTimeout,
},
AutoEjectHost: true,//是否弹出故障主机
ServerFailureLimit: 3,//达到失败次数时弹出
ServerRetryTimeout: 5 * time.Second,//在“ServerRetryTimeout”之后重试弹出的主机`
MinServerNum: 0,//保留min服务器 针对从服务器
})
if err != nil {
panic(err)
}
}
return d
}
func Init() {
Dao = &dao{}
once.Do(func() {
//单、主数据 master
Dao.db = make(map[string]*xorm.Engine, 0)
//主从数据 master slave
Dao.dbGroup = make(map[string]*xorm.EngineGroup, 0)
//redis连接池 支持主从 master slave
Dao.redisGroup = make(map[string]*redisPool.Pool, 0)
Dao = mysqlSetup(Dao)
Dao = redisSetup(Dao)
})
}
package dao
import "fmt"
func GetUser(){
fmt.Println("555555555555555555555555555555555")
res,err :=Dao.GetDb("micro").QueryString("select service_name from lie_service_info where is_delete = 1 ")
if err != nil{
}
fmt.Println(res)
res,err =Dao.GetDb("micro").QueryString("select service_desc from lie_service_info where is_delete = 1 ")
if err != nil{
}
//fmt.Println(res)
//Dao.GetRedisDbGroup("api").Set("abcdef","123456789",0)
//fmt.Println("555555555555555555555555555555555")
//a := Dao.GetRedisDbGroup("api").Get("abcdef")
//fmt.Println(a.Result())
//
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
fmt.Println(Dao.GetRedisDbGroup("sku").Get("abcdef").String())
//fmt.Println("555555555555555555555555555555555")
}
#model
\ No newline at end of file
#service
\ No newline at end of file
package service
import "golang-asynctask/app/dao"
func GetData(){
dao.GetUser()
}
package boot
import (
"github.com/ichunt2019/cfg/lib"
xlog "github.com/ichunt2019/lxLog/log"
)
func Init(configPath string,logPath string)(err error){
err = lib.Init(configPath)
if err != nil{
panic(err)
}
xlog.Init(logPath,"crmuser_to_erp")
//初始化数据库
//dao.Init()
return
}
package main
import (
"flag"
"os"
"os/signal"
"syscall"
"golang-asynctask/boot"
"golang-asynctask/cmd/crm/sync_comuser_to_erp/service"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
)
var (
configPath string
logPath string
)
func main(){
flag.StringVar(&configPath, "config", "./config/dev/", "配置文件")
flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录")
flag.Parse()
boot.Init(configPath,logPath)
t := &service.RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
"a-maxwell",
"maxwell.lie_invoice_company",
"",
"direct",
"amqp://guest:guest@192.168.2.232:5672/",
},t,1)
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
package service
import (
logger "github.com/ichunt2019/lxLog/log"
"golang-asynctask/app/dao/crm"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
func (t *RecvPro) Consumer(dataByte []byte) error {
//fmt.Println(string(dataByte))
logger.Instance("crmuser_to_erp").Info(string(dataByte))
error := crm.PushComUserInfoToErp(string(dataByte))
return error
}
//消息已经消费3次 失败了 请进行处理
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
logger.Instance("crmuser_to_erp").Error("任务处理失败了,我要进入db日志库了")
logger.Instance("crmuser_to_erp").Error("任务处理失败了,发送钉钉消息通知主人")
logger.Instance("crmuser_to_erp").Error(string(dataByte))
logger.Instance("crmuser_to_erp").Error("错误原因:%s",err)
return nil
}
title = "TOML 例子"
viewpath = "/home/www/templates/"
pushErpDomain = "http://crm.liexin.net/api/pushComUserRelationToErp"
apiKey = "crm a1b2c3d4e5f6g7h8i9jk"
[owner]
name = "Tom Preston-Werner"
organization = "GitHub"
bio = "GitHub Cofounder & CEO\nLikes tater tots and beer."
dob = 1979-05-27T07:32:00Z # 日期时间是一等公民。为什么不呢?
[servers]
# 你可以依照你的意愿缩进。使用空格或Tab。TOML不会在意。
[servers.alpha]
ip = "10.0.0.1"
dc = "eqdc10"
[servers.beta]
ip = "10.0.0.2"
dc = "eqdc10"
[clients]
data = [["gamma", "delta"],[1, 2]]
[database]
[database.default]
host = "192.168.2.246"
[supplier_no_brand]
3 = [615,757,46596,43172,52,46481,47811,48817]
7 = [47778]
9 = [47778,4589,12369]
[xorm]
ShowSQL = true
[crm]
data_source_name = "liexin_crm:liexin_crm#zsyM@tcp(192.168.1.235:3306)/liexin_crm?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 3
max_idle_conn = 1
table_prefix = "lie_"
max_conn_life_time = 100
[liexin]
max_open_conn = 20
max_idle_conn = 10
table_prefix = ""
max_conn_life_time = 100
data_source_name = [
"micro_service:lie_micro_service#zsyM@tcp(192.168.2.232:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing",
"micro_service:lie_micro_service#zsyM@tcp(192.168.2.232:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing",
"micro_service:lie_micro_service#zsyM@tcp(192.168.2.232:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing",
]
[cms]
dns = "micro_service:lie_micro_service#zsyM@tcp(192.168.2.232:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20
max_idle_conn = 10
table_prefix = ""
max_conn_life_time = 100
[sku]
[sku.sku_0]
dns = "micro_service:lie_micro_service#zsyM@tcp(192.168.2.232:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20
max_idle_conn = 10
table_prefix = ""
max_conn_life_time = 100
[sku.sku_1]
dns = "micro_service:lie_micro_service#zsyM@tcp(192.168.2.232:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20
max_idle_conn = 10
table_prefix = ""
max_conn_life_time = 100
#服务注册名称
micro_service_name = "golang_common_demo"
#etcd配置信息 etcd服务的ip端口用户密码
[etcd_config]
addrs = [
"192.168.2.232:2379"
]
username = ""
password = ""
#注册到etcd中的ip 端口 权重信息
[etcd_regist]
ip = "192.168.2.246"
port = 60020
weight = 10
[base]
debug_mode="debug"
time_location="Asia/Chongqing"
[http]
addr ="192.168.2.246:8700" # 监听地址, default ":8700"
read_timeout = 10 # 读取超时时长
write_timeout = 10 # 写入超时时长
max_header_bytes = 20 # 最大的header大小,二进制位长度
[api]
dial_timeout = 20
min_idle_conns = 10
read_timeout = 10
write_timeout = 10
[api.master]
host = "192.168.1.235:6379"
password = "icDb29mLy2s"
[api.slave]
password = "icDb29mLy2s"
host = [
"192.168.1.235:6379",
"192.168.1.237:6379",
]
[sku]
dial_timeout = 20
min_idle_conns = 10
read_timeout = 10
write_timeout = 10
[sku.master]
host = "192.168.1.235:6379"
password = "icDb29mLy2s"
[sku.slave]
password = "icDb29mLy2s"
host = [
"192.168.1.235:6379",
"192.168.1.237:6379",
]
module golang-asynctask
go 1.14
require (
github.com/Chronokeeper/anyxml v0.0.0-20160530174208-54457d8e98c6 // indirect
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53 // indirect
github.com/CloudyKit/jet v2.1.2+incompatible // indirect
github.com/agrison/go-tablib v0.0.0-20160310143025-4930582c22ee // indirect
github.com/agrison/mxj v0.0.0-20160310142625-1269f8afb3b4 // indirect
github.com/bndr/gotabulate v1.1.2 // indirect
github.com/boj/redistore v0.0.0-20180917114910-cd5dcc76aeff // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/gin-gonic/contrib v0.0.0-20201101042839-6a891bf89f19 // indirect
github.com/gin-gonic/gin v1.6.3
github.com/go-redis/redis/v7 v7.4.0
github.com/go-sql-driver/mysql v1.5.0
github.com/gorilla/sessions v1.2.1 // indirect
github.com/ichunt2019/cfg v0.0.0-20210310074903-4b1bcab17717
github.com/ichunt2019/go-redis-pool v0.0.0-20210305064829-86b9011c57f5
github.com/ichunt2019/golang-rbmq-sl v0.0.0-20200515075131-59a37ab77d7d // indirect
github.com/ichunt2019/ichunt-micro-registry v1.0.1
github.com/ichunt2019/lxLog v0.0.0-20210226024426-781becb3c042
github.com/lib/pq v1.9.0 // indirect
github.com/mattn/go-sqlite3 v1.14.6 // indirect
github.com/spf13/viper v1.7.1
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/tealeg/xlsx v1.0.5 // indirect
github.com/tidwall/gjson v1.6.8
github.com/xormplus/builder v0.0.0-20200331055651-240ff40009be // indirect
github.com/xormplus/xorm v0.0.0-20210107091022-175d736afaae // indirect
google.golang.org/grpc/examples v0.0.0-20210226164526-c949703b4b98 // indirect
gopkg.in/flosch/pongo2.v3 v3.0.0-20141028000813-5e81b817a0c4 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
xorm.io/xorm v1.0.7
)
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
This diff is collapsed. Click to expand it.
package lib
import (
"bytes"
"fmt"
"github.com/spf13/viper"
"io/ioutil"
"os"
"strings"
)
var ConfEnvPath string //配置文件夹
var ConfEnv string //配置环境名 比如:dev prod test
// 解析配置文件目录
//
// 配置文件必须放到一个文件夹中
// 如:config=conf/dev/base.json ConfEnvPath=conf/dev ConfEnv=dev
// 如:config=conf/base.json ConfEnvPath=conf ConfEnv=conf
func ParseConfPath(config string) error {
path := strings.Split(config, "/")
prefix := strings.Join(path[:len(path)-1], "/")
ConfEnvPath = prefix
ConfEnv = path[len(path)-2]
return nil
}
//获取配置环境名
func GetConfEnv() string{
return ConfEnv
}
func GetConfPath(fileName string) string {
return ConfEnvPath + "/" + fileName + ".toml"
}
func GetConfFilePath(fileName string) string {
return ConfEnvPath + "/" + fileName
}
//本地解析文件
func ParseLocalConfig(fileName string, st interface{}) error {
path := GetConfFilePath(fileName)
err := ParseConfig(path, st)
if err != nil {
return err
}
return nil
}
func ParseConfig(path string, conf interface{}) error {
file, err := os.Open(path)
if err != nil {
return fmt.Errorf("Open config %v fail, %v", path, err)
}
data, err := ioutil.ReadAll(file)
if err != nil {
return fmt.Errorf("Read config fail, %v", err)
}
v:=viper.New()
v.SetConfigType("toml")
v.ReadConfig(bytes.NewBuffer(data))
if err:=v.Unmarshal(conf);err!=nil{
return fmt.Errorf("Parse config fail, config:%v, err:%v", string(data), err)
}
return nil
}
package lib
import (
"bytes"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"regexp"
"strings"
"time"
)
var TimeLocation *time.Location
var TimeFormat = "2006-01-02 15:04:05"
var DateFormat = "2006-01-02"
var LocalIP = net.ParseIP("127.0.0.1")
func HttpGET( urlString string, urlParams url.Values, msTimeout int, header http.Header) (*http.Response, []byte, error) {
client := http.Client{
Timeout: time.Duration(msTimeout) * time.Millisecond,
}
urlString = AddGetDataToUrl(urlString, urlParams)
req, err := http.NewRequest("GET", urlString, nil)
if err != nil {
return nil, nil, err
}
if len(header) > 0 {
req.Header = header
}
resp, err := client.Do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}
return resp, body, nil
}
func HttpPOST(urlString string, urlParams url.Values, msTimeout int, header http.Header, contextType string) (*http.Response, []byte, error) {
client := http.Client{
Timeout: time.Duration(msTimeout) * time.Millisecond,
}
if contextType == "" {
contextType = "application/x-www-form-urlencoded"
}
urlParamEncode := urlParams.Encode()
req, err := http.NewRequest("POST", urlString, strings.NewReader(urlParamEncode))
if len(header) > 0 {
req.Header = header
}
req.Header.Set("Content-Type", contextType)
resp, err := client.Do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}
return resp, body, nil
}
func HttpJSON(urlString string, jsonContent string, msTimeout int, header http.Header) (*http.Response, []byte, error) {
client := http.Client{
Timeout: time.Duration(msTimeout) * time.Millisecond,
}
req, err := http.NewRequest("POST", urlString, strings.NewReader(jsonContent))
if len(header) > 0 {
req.Header = header
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}
return resp, body, nil
}
func AddGetDataToUrl(urlString string, data url.Values) string {
if strings.Contains(urlString, "?") {
urlString = urlString + "&"
} else {
urlString = urlString + "?"
}
return fmt.Sprintf("%s%s", urlString, data.Encode())
}
func addTrace2Header(request *http.Request, trace *TraceContext) *http.Request {
traceId := trace.TraceId
cSpanId := NewSpanId()
if traceId != "" {
request.Header.Set("didi-header-rid", traceId)
}
if cSpanId != "" {
request.Header.Set("didi-header-spanid", cSpanId)
}
trace.CSpanId = cSpanId
return request
}
func GetMd5Hash(text string) string {
hasher := md5.New()
hasher.Write([]byte(text))
return hex.EncodeToString(hasher.Sum(nil))
}
func Encode(data string) (string, error) {
h := md5.New()
_, err := h.Write([]byte(data))
if err != nil {
return "", err
}
return hex.EncodeToString(h.Sum(nil)), nil
}
func ParseServerAddr(serverAddr string) (host, port string) {
serverInfo := strings.Split(serverAddr, ":")
if len(serverInfo) == 2 {
host = serverInfo[0]
port = serverInfo[1]
} else {
host = serverAddr
port = ""
}
return host, port
}
func NewTrace() *TraceContext {
trace := &TraceContext{}
trace.TraceId = GetTraceId()
trace.SpanId = NewSpanId()
return trace
}
func NewSpanId() string {
timestamp := uint32(time.Now().Unix())
ipToLong := binary.BigEndian.Uint32(LocalIP.To4())
b := bytes.Buffer{}
b.WriteString(fmt.Sprintf("%08x", ipToLong^timestamp))
b.WriteString(fmt.Sprintf("%08x", rand.Int31()))
return b.String()
}
func GetTraceId() (traceId string) {
return calcTraceId(LocalIP.String())
}
func calcTraceId(ip string) (traceId string) {
now := time.Now()
timestamp := uint32(now.Unix())
timeNano := now.UnixNano()
pid := os.Getpid()
b := bytes.Buffer{}
netIP := net.ParseIP(ip)
if netIP == nil {
b.WriteString("00000000")
} else {
b.WriteString(hex.EncodeToString(netIP.To4()))
}
b.WriteString(fmt.Sprintf("%08x", timestamp&0xffffffff))
b.WriteString(fmt.Sprintf("%04x", timeNano&0xffff))
b.WriteString(fmt.Sprintf("%04x", pid&0xffff))
b.WriteString(fmt.Sprintf("%06x", rand.Int31n(1<<24)))
b.WriteString("b0") // 末两位标记来源,b0为go
return b.String()
}
func GetLocalIPs() (ips []net.IP) {
interfaceAddr, err := net.InterfaceAddrs()
if err != nil {
return nil
}
for _, address := range interfaceAddr {
ipNet, isValidIpNet := address.(*net.IPNet)
if isValidIpNet && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
ips = append(ips, ipNet.IP)
}
}
}
return ips
}
func InArrayString(s string, arr []string) bool {
for _, i := range arr {
if i == s {
return true
}
}
return false
}
//Substr 字符串的截取
func Substr(str string, start int64, end int64) string {
length := int64(len(str))
if start < 0 || start > length {
return ""
}
if end < 0 {
return ""
}
if end > length {
end = length
}
return string(str[start:end])
}
//利用正则表达式压缩字符串,去除空格或制表符
func CompressStr(str string) string {
if str == "" {
return ""
}
//匹配一个或多个空白符的正则表达式
reg := regexp.MustCompile("\\s+")
return reg.ReplaceAllString(str, "")
}
func ClientIP(r *http.Request) string {
xForwardedFor := r.Header.Get("X-Forwarded-For")
ip := strings.TrimSpace(strings.Split(xForwardedFor, ",")[0])
if ip != "" {
return ip
}
ip = strings.TrimSpace(r.Header.Get("X-Real-Ip"))
if ip != "" {
return ip
}
if ip, _, err := net.SplitHostPort(strings.TrimSpace(r.RemoteAddr)); err == nil {
return ip
}
return ""
}
\ No newline at end of file
package lib
import (
_ "github.com/go-sql-driver/mysql"
"xorm.io/xorm"
"golang-asynctask/app/common/config"
"github.com/ichunt2019/cfg/lib"
)
var DatabaseConMap map[string]*xorm.Engine
func Setup() error {
DatabaseConMap = make(map[string]*xorm.Engine, 0)
DatabaseList := config.BuildDatabaseList()
var err error
//循环生成数据库链接
for conName, db := range DatabaseList {
DatabaseConMap[conName], err = xorm.NewEngine("mysql", db.DataSourceName)
if err != nil {
panic(err)
}
//日志打印SQL
ShowSql := lib.Instance("db").GetBool("xorm.ShowSQL")
DatabaseConMap[conName].ShowSQL(ShowSql)
//设置连接池的空闲数大小
DatabaseConMap[conName].SetMaxIdleConns(db.MaxIdleCons)
//设置最大打开连接数
DatabaseConMap[conName].SetMaxOpenConns(db.MaxOpenCons)
}
return nil
}
func Conn(conName string) *xorm.Engine {
return DatabaseConMap[conName]
}
package lib
import (
"fmt"
dlog "github.com/ichunt2019/lxLog/log"
"strings"
)
// 通用DLTag常量定义
const (
DLTagUndefind = "_undef"
DLTagMySqlFailed = "_com_mysql_failure"
DLTagRedisFailed = "_com_redis_failure"
DLTagMySqlSuccess = "_com_mysql_success"
DLTagRedisSuccess = "_com_redis_success"
DLTagThriftFailed = "_com_thrift_failure"
DLTagThriftSuccess = "_com_thrift_success"
DLTagHTTPSuccess = "_com_http_success"
DLTagHTTPFailed = "_com_http_failure"
DLTagTCPFailed = "_com_tcp_failure"
DLTagRequestIn = "_com_request_in"
DLTagRequestOut = "_com_request_out"
)
const (
_dlTag = "dltag"
_traceId = "traceid"
_spanId = "spanid"
_childSpanId = "cspanid"
_dlTagBizPrefix = "_com_"
_dlTagBizUndef = "_com_undef"
)
var Log *Logger
type Trace struct {
TraceId string
SpanId string
Caller string
SrcMethod string
HintCode int64
HintContent string
}
type TraceContext struct {
Trace
CSpanId string
}
type Logger struct {
}
func (l *Logger) TagInfo(trace *TraceContext, dltag string, m map[string]interface{}) {
m[_dlTag] = checkDLTag(dltag)
m[_traceId] = trace.TraceId
m[_childSpanId] = trace.CSpanId
m[_spanId] = trace.SpanId
dlog.Instance("request").Info(parseParams(m))
}
func (l *Logger) TagWarn(trace *TraceContext, dltag string, m map[string]interface{}) {
m[_dlTag] = checkDLTag(dltag)
m[_traceId] = trace.TraceId
m[_childSpanId] = trace.CSpanId
m[_spanId] = trace.SpanId
dlog.Instance("request").Warn(parseParams(m))
}
func (l *Logger) TagError(trace *TraceContext, dltag string, m map[string]interface{}) {
m[_dlTag] = checkDLTag(dltag)
m[_traceId] = trace.TraceId
m[_childSpanId] = trace.CSpanId
m[_spanId] = trace.SpanId
dlog.Instance("request").Error(parseParams(m))
}
func (l *Logger) TagTrace(trace *TraceContext, dltag string, m map[string]interface{}) {
m[_dlTag] = checkDLTag(dltag)
m[_traceId] = trace.TraceId
m[_childSpanId] = trace.CSpanId
m[_spanId] = trace.SpanId
dlog.Instance("request").Trace(parseParams(m))
}
func (l *Logger) TagDebug(trace *TraceContext, dltag string, m map[string]interface{}) {
m[_dlTag] = checkDLTag(dltag)
m[_traceId] = trace.TraceId
m[_childSpanId] = trace.CSpanId
m[_spanId] = trace.SpanId
dlog.Instance("request").Debug(parseParams(m))
}
func (l *Logger) Close() {
dlog.Instance("request").Close()
}
// 生成业务dltag
func CreateBizDLTag(tagName string) string {
if tagName == "" {
return _dlTagBizUndef
}
return _dlTagBizPrefix + tagName
}
// 校验dltag合法性
func checkDLTag(dltag string) string {
if strings.HasPrefix(dltag, _dlTagBizPrefix) {
return dltag
}
if strings.HasPrefix(dltag, "_com_") {
return dltag
}
if dltag == DLTagUndefind {
return dltag
}
return dltag
}
//map格式化为string
func parseParams(m map[string]interface{}) string {
var dltag string = "_undef"
if _dltag, _have := m["dltag"]; _have {
if __val, __ok := _dltag.(string); __ok {
dltag = __val
}
}
for _key, _val := range m {
if _key == "dltag" {
continue
}
dltag = dltag + "||" + fmt.Sprintf("%v=%+v", _key, _val)
}
dltag = strings.Trim(fmt.Sprintf("%q", dltag), "\"")
return dltag
}
package util
import (
"context"
"golang-asynctask/util/lib"
"github.com/gin-gonic/gin"
)
//错误日志
func ContextWarning(c context.Context, dltag string, m map[string]interface{}) {
v:=c.Value("trace")
traceContext,ok := v.(*lib.TraceContext)
if !ok{
traceContext = lib.NewTrace()
}
lib.Log.TagWarn(traceContext, dltag, m)
}
//错误日志
func ContextError(c context.Context, dltag string, m map[string]interface{}) {
v:=c.Value("trace")
traceContext,ok := v.(*lib.TraceContext)
if !ok{
traceContext = lib.NewTrace()
}
lib.Log.TagError(traceContext, dltag, m)
}
//普通日志
func ContextNotice(c context.Context, dltag string, m map[string]interface{}) {
v:=c.Value("trace")
traceContext,ok := v.(*lib.TraceContext)
if !ok{
traceContext = lib.NewTrace()
}
lib.Log.TagInfo(traceContext, dltag, m)
}
//错误日志
func ComLogWarning(c *gin.Context, dltag string, m map[string]interface{}) {
traceContext := GetGinTraceContext(c)
lib.Log.TagError(traceContext, dltag, m)
}
//普通日志
func ComLogNotice(c *gin.Context, dltag string, m map[string]interface{}) {
traceContext := GetGinTraceContext(c)
lib.Log.TagInfo(traceContext, dltag, m)
}
// 从gin的Context中获取数据
func GetGinTraceContext(c *gin.Context) *lib.TraceContext {
// 防御
if c == nil {
return lib.NewTrace()
}
traceContext, exists := c.Get("trace")
if exists {
if tc, ok := traceContext.(*lib.TraceContext); ok {
return tc
}
}
return lib.NewTrace()
}
// 从Context中获取数据
func GetTraceContext(c context.Context) *lib.TraceContext {
if c == nil {
return lib.NewTrace()
}
traceContext:=c.Value("trace")
if tc, ok := traceContext.(*lib.TraceContext); ok {
return tc
}
return lib.NewTrace()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment