Commit 791d44cc by 孙龙

init

parents
/logs
# ide
.idea
/config/*.toml
*.properties
*.exe
*.exe~
gowatch.yml
\ No newline at end of file
# golang-callback_demo
## 如何使用该项目 :
#### 该示例的项目名称为 : go-supplier-sync , 如果你要使用,请自行替换成你的项目名称
### 1.clone项目
git clone ssh://git@119.23.72.7:22611/sunlong_v5/golang-callback_demo.git
### 2.复制项目
1.将golang-callback_demo复制一份成你的自己的项目,比如go-supplier-sync
2.删除.git文件夹,重新初始化git
### 3.修改项目文件夹名称
1.将 golang-callback_demo 改成 go-supplier-sync
2.编辑器批量修改包名 : golang-callback_demo/ => go-supplier-sync/
### 4.修改go.mod文件(重要)
一定要修改go.mod文件,要不然无法跑起来
把go.mod的第一行 : module go-supplier-sync 修改成 module go-supplier-sync
### 5.修改http监听地址
config/proxy.toml.http.addr 修改成你的ip地址或者本地127.0.0.1
### 6.项目启动
go run ./cmd/main.go
go run ./cmd/main.go -config=./config/dev/ -logdir = ./logs/
##项目的目录结构
├── app 项目
│ ├── common 公共文件
│ ├── controller 业务逻辑访问入口
│ ├── dao 数据层操作(对mysql 等数据操作)
│ ├── model 数据层模型 (数据库表定义)
│ └── service 处理业务逻辑
├── boot 启动文件
├── cmd 入口
├── config 配置
├── document
├── util 包管理
├── middleware 中间件
├── router 路由
├── resources
├── go.mod
参考文献:
xorm:https://www.kancloud.cn/xormplus/xorm/413729
go-redis-pool https://github.com/ichunt2019/go-redis-pool
package http
import (
"github.com/ichunt2019/cfg/lib"
)
type BaseDatabase struct {
DataSourceName string
MaxIdleCons int
MaxOpenCons int
Prefix string
}
type GroupDatabase struct {
DataSourceName []string
MaxIdleCons int
MaxOpenCons int
Prefix string
}
//单、主数据 master
func BuildDatabaseList() (DatabaseList map[string]BaseDatabase) {
return map[string]BaseDatabase{
"async_task": {
DataSourceName: lib.Instance("db").GetString("async_task.data_source_name"),
Prefix: lib.Instance("db").GetString("async_task.table_prefix"),
MaxIdleCons:lib.Instance("db").GetInt("async_task.max_idle_conn"),
MaxOpenCons:lib.Instance("db").GetInt("async_task.max_open_conn"),
},
}
}
//主从数据 master slave
func BuildGroupDatabaseList() (DatabaseList map[string]GroupDatabase) {
return map[string]GroupDatabase{
"liexin": {
DataSourceName:lib.Instance("db").GetStringSlice("liexin.data_source_name"),
Prefix: lib.Instance("db").GetString("liexin.table_prefix"),
MaxIdleCons:lib.Instance("db").GetInt("liexin.max_idle_conn"),
MaxOpenCons:lib.Instance("db").GetInt("liexin.max_open_conn"),
},
}
}
package http
import (
"github.com/ichunt2019/cfg/lib"
"time"
)
type RedisGroupDatabase struct {
MasterHost string
Password string //master密码
SlaveHost []string
ReadonlyPassword string //从服务器密码
DialTimeout time.Duration //连接超时
MinIdleConns int //空间链接
ReadTimeout time.Duration //读超时
WriteTimeout time.Duration //写超时
}
//多数据库配置
func BuildRedisGroupConfigs() ( map[string]RedisGroupDatabase) {
//fmt.Println("6666666666")
//fmt.Println(lib.Instance("redis").GetStringSlice("api.slave.host"))
return map[string]RedisGroupDatabase{
"api": {
MasterHost: lib.Instance("redis").GetString("api.master.host"),
Password:lib.Instance("redis").GetString("api.master.password"),
SlaveHost: lib.Instance("redis").GetStringSlice("api.slave.host"),
ReadonlyPassword:lib.Instance("redis").GetString("api.slave.password"),
DialTimeout: time.Duration(lib.Instance("redis").GetInt("api.dial_timeout"))*time.Second,
ReadTimeout: time.Duration(lib.Instance("redis").GetInt("api.read_timeout"))*time.Second,
WriteTimeout: time.Duration(lib.Instance("redis").GetInt("api.write_timeout"))*time.Second,
MinIdleConns:lib.Instance("redis").GetInt("api.min_idle_conns"),
},
"sku": {
MasterHost: lib.Instance("redis").GetString("sku.master.host"),
Password:lib.Instance("redis").GetString("sku.master.password"),
SlaveHost: lib.Instance("redis").GetStringSlice("sku.slave.host"),
ReadonlyPassword:lib.Instance("redis").GetString("sku.slave.password"),
DialTimeout: time.Duration(lib.Instance("redis").GetInt("sku.dial_timeout"))*time.Second,
ReadTimeout: time.Duration(lib.Instance("redis").GetInt("sku.read_timeout"))*time.Second,
WriteTimeout: time.Duration(lib.Instance("redis").GetInt("sku.write_timeout"))*time.Second,
MinIdleConns:lib.Instance("redis").GetInt("sku.min_idle_conns"),
},
}
}
package task
import (
"github.com/ichunt2019/cfg/lib"
)
type BaseDatabase struct {
DataSourceName string
MaxIdleCons int
MaxOpenCons int
Prefix string
}
type GroupDatabase struct {
DataSourceName []string
MaxIdleCons int
MaxOpenCons int
Prefix string
}
//单、主数据 master
func BuildDatabaseList() (DatabaseList map[string]BaseDatabase) {
return map[string]BaseDatabase{
"async_task": {
DataSourceName: lib.Instance("db").GetString("async_task.data_source_name"),
Prefix: lib.Instance("db").GetString("async_task.table_prefix"),
MaxIdleCons:lib.Instance("db").GetInt("async_task.max_idle_conn"),
MaxOpenCons:lib.Instance("db").GetInt("async_task.max_open_conn"),
},
}
}
//主从数据 master slave
func BuildGroupDatabaseList() (DatabaseList map[string]GroupDatabase) {
return map[string]GroupDatabase{
"liexin": {
DataSourceName:lib.Instance("db").GetStringSlice("liexin.data_source_name"),
Prefix: lib.Instance("db").GetString("liexin.table_prefix"),
MaxIdleCons:lib.Instance("db").GetInt("liexin.max_idle_conn"),
MaxOpenCons:lib.Instance("db").GetInt("liexin.max_open_conn"),
},
}
}
package task
import (
"github.com/ichunt2019/cfg/lib"
"time"
)
type RedisGroupDatabase struct {
MasterHost string
Password string //master密码
SlaveHost []string
ReadonlyPassword string //从服务器密码
DialTimeout time.Duration //连接超时
MinIdleConns int //空间链接
ReadTimeout time.Duration //读超时
WriteTimeout time.Duration //写超时
}
//多数据库配置
func BuildRedisGroupConfigs() ( map[string]RedisGroupDatabase) {
//fmt.Println("6666666666")
//fmt.Println(lib.Instance("redis").GetStringSlice("api.slave.host"))
return map[string]RedisGroupDatabase{
"api": {
MasterHost: lib.Instance("redis").GetString("api.master.host"),
Password:lib.Instance("redis").GetString("api.master.password"),
SlaveHost: lib.Instance("redis").GetStringSlice("api.slave.host"),
ReadonlyPassword:lib.Instance("redis").GetString("api.slave.password"),
DialTimeout: time.Duration(lib.Instance("redis").GetInt("api.dial_timeout"))*time.Second,
ReadTimeout: time.Duration(lib.Instance("redis").GetInt("api.read_timeout"))*time.Second,
WriteTimeout: time.Duration(lib.Instance("redis").GetInt("api.write_timeout"))*time.Second,
MinIdleConns:lib.Instance("redis").GetInt("api.min_idle_conns"),
},
"sku": {
MasterHost: lib.Instance("redis").GetString("sku.master.host"),
Password:lib.Instance("redis").GetString("sku.master.password"),
SlaveHost: lib.Instance("redis").GetStringSlice("sku.slave.host"),
ReadonlyPassword:lib.Instance("redis").GetString("sku.slave.password"),
DialTimeout: time.Duration(lib.Instance("redis").GetInt("sku.dial_timeout"))*time.Second,
ReadTimeout: time.Duration(lib.Instance("redis").GetInt("sku.read_timeout"))*time.Second,
WriteTimeout: time.Duration(lib.Instance("redis").GetInt("sku.write_timeout"))*time.Second,
MinIdleConns:lib.Instance("redis").GetInt("sku.min_idle_conns"),
},
}
}
package common
import (
"github.com/gin-gonic/gin"
"strings"
)
type RecommendRequest struct {
GoodsName string `form:"goods_name"`
Attrs string `form:"attrs"`
Encap string `form:"encap"`
Num int `form:"num"`
DeliveryType int `form:"delivery_type"`
Flag int `form:"flag"`
BrandName string `form:"brand_name"`
}
//获取所有请求参数放到字典里面
func GetAllRequestParams(c *gin.Context) (request map[string]string){
request = make(map[string]string)
c.MultipartForm()
for name, value := range c.Request.Form {
if value[0] != "" {
request[name] = strings.TrimSpace(value[0])
}
}
return
}
package common
import (
"github.com/gin-gonic/gin"
"strings"
)
type Response struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
Data interface{} `json:"data"`
}
type BomResponse struct {
ErrCode int `json:"error_code"`
ErrMsg string `json:"error_msg"`
Flag int `json:"flag"`
Total int `json:"total"`
Data interface{} `json:"data"`
}
func SuccessResponse(errCode int, errMsg string, data interface{}) Response {
return Response{
ErrCode: errCode,
ErrMsg: errMsg,
Data: data,
}
}
func ErrorResponse(errCode int, errMsg string) Response {
return Response{
ErrCode: errCode,
ErrMsg: errMsg,
Data: []string{},
}
}
//统一输出,里面还要去处理jsonp
func Output(ctx *gin.Context,errCode int, errMsg string, data interface{}) {
if data == nil {
data = []string{}
}
response := Response{
ErrCode: errCode,
ErrMsg: errMsg,
Data: data,
}
//if errCode >= 100 {
// //SearchApiLogger(ctx,errCode, errMsg)
//}
if ctx.DefaultQuery("callback", "") != "" {
ctx.JSONP(200, response)
} else {
referer := ctx.Request.Referer()
referer = strings.TrimRight(referer, "/")
ctx.Header("Access-Control-Allow-Origin", referer)
ctx.Header("Access-Control-Allow-Credentials", "true")
//允许跨站访问的站点域名
//跨域请求头设置
ctx.JSON(200, response)
}
}
//简单的返回数据方法
func ReturnData(ctx *gin.Context,errCode int, errMsg string, data interface{}) {
if data == nil {
data = []string{}
if errCode == 0 {
errCode = 1
}
}
response := Response{
ErrCode: errCode,
ErrMsg: errMsg,
Data: data,
}
ctx.JSON(200, response)
}
package common
import (
"reflect"
"sort"
)
func getCommon(array interface{}) (reflect.Type, reflect.Value, int) {
t := reflect.TypeOf(array)
v := reflect.ValueOf(array)
l := v.Len()
return t, v, l
}
func SortSlice(array interface{}) {
t, v, _ := getCommon(array)
// res := make([]interface{}, l)
if t.Kind() == reflect.Slice {
switch v.Index(0).Kind() {
case reflect.Int:
array := array.([]int)
sort.Ints(array)
case reflect.String:
array := array.([]string)
sort.Strings(array)
case reflect.Float64:
array := array.([]float64)
sort.Float64s(array)
default:
panic("the param can only be int/string/float64 array")
}
} else {
panic("expects parameter 1 to be array")
}
}
package controller
import (
"github.com/gin-gonic/gin"
common "golang-callback/app/common/function"
"golang-callback/app/service/http"
)
func Ping(ctx *gin.Context) {
//xlog.Instance("sku").Info("ping")
////service.GetData()
common.Output(ctx, 0, "success",nil)
}
func Hbsdata(ctx *gin.Context) {
//xlog.Instance("spu").Info("Hbsdata")
common.Output(ctx, 0, "success",nil)
}
func Callback(ctx *gin.Context){
err := http.Callback(ctx)
var msg string
if err != nil{
msg = err.Error()
}else{
msg = ""
}
common.Output(ctx, 0, msg,nil)
}
#dao
\ No newline at end of file
package http
import (
"golang-callback/app/common/config/http"
"sync"
"time"
"xorm.io/xorm"
"github.com/go-redis/redis/v7"
_ "github.com/go-sql-driver/mysql"
"github.com/ichunt2019/cfg/lib"
redisPool "github.com/ichunt2019/go-redis-pool"
)
var (
once sync.Once
Dao *dao
)
type dao struct {
db map[string]*xorm.Engine //非主从mysql数据库 连接池
dbGroup map[string]*xorm.EngineGroup //mysql主从 连接池
redisGroup map[string]*redisPool.Pool //redis 主从 连接池
}
//获取db实例
func (self *dao) GetDb(databases string) *xorm.Engine {
return self.db[databases]
}
//获取主从db实例
//获取主从db实例
func (self *dao) GetDbGroup(databases string) *xorm.EngineGroup {
return self.dbGroup[databases]
}
//获取主从db实例
func (self *dao) GetRedisDbGroup(databases string) *redisPool.Pool {
return self.redisGroup[databases]
}
func mysqlSetup(d *dao) *dao {
var (
err error
)
DatabaseList := http.BuildDatabaseList()
GroupDatabaseList := http.BuildGroupDatabaseList()
if len(GroupDatabaseList) > 0 {
for conName, db := range DatabaseList {
d.db[conName], err = xorm.NewEngine("mysql", db.DataSourceName)
if err != nil {
panic(err)
}
//日志打印SQL
ShowSql := lib.Instance("db").GetBool("xorm.ShowSQL")
d.db[conName].ShowSQL(ShowSql)
//设置连接池的空闲数大小
d.db[conName].SetMaxIdleConns(db.MaxIdleCons)
//设置最大打开连接数
d.db[conName].SetMaxOpenConns(db.MaxOpenCons)
}
}
if len(GroupDatabaseList) > 0 {
for conName, db := range GroupDatabaseList {
d.dbGroup[conName], err = xorm.NewEngineGroup("mysql", db.DataSourceName)
if err != nil {
panic(err)
}
//日志打印SQL
ShowSql := lib.Instance("db").GetBool("xorm.ShowSQL")
d.dbGroup[conName].ShowSQL(ShowSql)
//设置连接池的空闲数大小
d.dbGroup[conName].SetMaxIdleConns(db.MaxIdleCons)
//设置最大打开连接数
d.dbGroup[conName].SetMaxOpenConns(db.MaxOpenCons)
}
}
return d
}
func redisSetup(d *dao) *dao {
var err error
redisGroupList := http.BuildRedisGroupConfigs()
//fmt.Println(redisGroupList)
for redisServerName, redisInfo := range redisGroupList {
d.redisGroup[redisServerName], err = redisPool.NewHA(&redisPool.HAConfig{
Master: redisInfo.MasterHost,
Slaves: redisInfo.SlaveHost,
Password: redisInfo.Password,
ReadonlyPassword: redisInfo.ReadonlyPassword,
Options: &redis.Options{
DialTimeout: redisInfo.DialTimeout, //连接超时
MinIdleConns: redisInfo.MinIdleConns, //空闲链接数
ReadTimeout: redisInfo.ReadTimeout,
WriteTimeout: redisInfo.WriteTimeout,
},
AutoEjectHost: true,//是否弹出故障主机
ServerFailureLimit: 3,//达到失败次数时弹出
ServerRetryTimeout: 5 * time.Second,//在“ServerRetryTimeout”之后重试弹出的主机`
MinServerNum: 0,//保留min服务器 针对从服务器
})
if err != nil {
panic(err)
}
}
return d
}
func Init() {
Dao = &dao{}
once.Do(func() {
//单、主数据 master
Dao.db = make(map[string]*xorm.Engine, 0)
//主从数据 master slave
Dao.dbGroup = make(map[string]*xorm.EngineGroup, 0)
//redis连接池 支持主从 master slave
//Dao.redisGroup = make(map[string]*redisPool.Pool, 0)
Dao = mysqlSetup(Dao)
//Dao = redisSetup(Dao)
})
}
package task
import (
"golang-callback/app/common/config/http"
"golang-callback/app/common/config/task"
"sync"
"time"
"xorm.io/xorm"
"github.com/go-redis/redis/v7"
_ "github.com/go-sql-driver/mysql"
"github.com/ichunt2019/cfg/lib"
redisPool "github.com/ichunt2019/go-redis-pool"
)
var (
once sync.Once
Dao *dao
)
type dao struct {
db map[string]*xorm.Engine //非主从mysql数据库 连接池
dbGroup map[string]*xorm.EngineGroup //mysql主从 连接池
redisGroup map[string]*redisPool.Pool //redis 主从 连接池
}
//获取db实例
func (self *dao) GetDb(databases string) *xorm.Engine {
return self.db[databases]
}
//获取主从db实例
//获取主从db实例
func (self *dao) GetDbGroup(databases string) *xorm.EngineGroup {
return self.dbGroup[databases]
}
//获取主从db实例
func (self *dao) GetRedisDbGroup(databases string) *redisPool.Pool {
return self.redisGroup[databases]
}
func mysqlSetup(d *dao) *dao {
var (
err error
)
DatabaseList := task.BuildDatabaseList()
GroupDatabaseList := task.BuildGroupDatabaseList()
if len(GroupDatabaseList) > 0 {
for conName, db := range DatabaseList {
d.db[conName], err = xorm.NewEngine("mysql", db.DataSourceName)
if err != nil {
panic(err)
}
//日志打印SQL
ShowSql := lib.Instance("db").GetBool("xorm.ShowSQL")
d.db[conName].ShowSQL(ShowSql)
//设置连接池的空闲数大小
d.db[conName].SetMaxIdleConns(db.MaxIdleCons)
//设置最大打开连接数
d.db[conName].SetMaxOpenConns(db.MaxOpenCons)
}
}
if len(GroupDatabaseList) > 0 {
for conName, db := range GroupDatabaseList {
d.dbGroup[conName], err = xorm.NewEngineGroup("mysql", db.DataSourceName)
if err != nil {
panic(err)
}
//日志打印SQL
ShowSql := lib.Instance("db").GetBool("xorm.ShowSQL")
d.dbGroup[conName].ShowSQL(ShowSql)
//设置连接池的空闲数大小
d.dbGroup[conName].SetMaxIdleConns(db.MaxIdleCons)
//设置最大打开连接数
d.dbGroup[conName].SetMaxOpenConns(db.MaxOpenCons)
}
}
return d
}
func redisSetup(d *dao) *dao {
var err error
redisGroupList := http.BuildRedisGroupConfigs()
//fmt.Println(redisGroupList)
for redisServerName, redisInfo := range redisGroupList {
d.redisGroup[redisServerName], err = redisPool.NewHA(&redisPool.HAConfig{
Master: redisInfo.MasterHost,
Slaves: redisInfo.SlaveHost,
Password: redisInfo.Password,
ReadonlyPassword: redisInfo.ReadonlyPassword,
Options: &redis.Options{
DialTimeout: redisInfo.DialTimeout, //连接超时
MinIdleConns: redisInfo.MinIdleConns, //空闲链接数
ReadTimeout: redisInfo.ReadTimeout,
WriteTimeout: redisInfo.WriteTimeout,
},
AutoEjectHost: true,//是否弹出故障主机
ServerFailureLimit: 3,//达到失败次数时弹出
ServerRetryTimeout: 5 * time.Second,//在“ServerRetryTimeout”之后重试弹出的主机`
MinServerNum: 0,//保留min服务器 针对从服务器
})
if err != nil {
panic(err)
}
}
return d
}
func Init() {
Dao = &dao{}
once.Do(func() {
//单、主数据 master
Dao.db = make(map[string]*xorm.Engine, 0)
//主从数据 master slave
Dao.dbGroup = make(map[string]*xorm.EngineGroup, 0)
//redis连接池 支持主从 master slave
//Dao.redisGroup = make(map[string]*redisPool.Pool, 0)
Dao = mysqlSetup(Dao)
//Dao = redisSetup(Dao)
})
}
package dao
import (
"fmt"
"golang-callback/app/dao/http"
)
func GetUser(){
fmt.Println("555555555555555555555555555555555")
res,err := http.Dao.GetDb("micro").QueryString("select service_name from lie_service_info where is_delete = 1 ")
if err != nil{
}
fmt.Println(res)
res,err = http.Dao.GetDb("micro").QueryString("select service_desc from lie_service_info where is_delete = 1 ")
if err != nil{
}
//fmt.Println(res)
//Dao.GetRedisDbGroup("api").Set("abcdef","123456789",0)
//fmt.Println("555555555555555555555555555555555")
//a := Dao.GetRedisDbGroup("api").Get("abcdef")
//fmt.Println(a.Result())
//
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
fmt.Println(http.Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
//fmt.Println(Dao.GetRedisDbGroup("api").Get("abcdef"))
fmt.Println(http.Dao.GetRedisDbGroup("sku").Get("abcdef").String())
//fmt.Println("555555555555555555555555555555555")
}
#model
\ No newline at end of file
#service
\ No newline at end of file
package http
import (
"encoding/json"
"errors"
"github.com/gin-gonic/gin"
"github.com/ichunt2019/cfg/lib"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
xlog "github.com/ichunt2019/lxLog/log"
"github.com/syyongx/php2go"
"golang-callback/app/dao"
"golang-callback/app/dao/http"
"strconv"
"strings"
"time"
)
func GetData(){
dao.GetUser()
}
type request struct {
CallbackDomain string `json:"callback_domain"`
CallbackUri string `json:"callback_uri"`
RequestType string `json:"request_type"`
Data string `json:"data"`
Method string `json:"method"`
ServiceType int `json:"service_type"`
TaskId int64 `json:"task_id"`
Header string `json:"header"`
IsHttpJson bool `json:"is_http_json"`
Remark string `json:"remark"`
}
func addMsg(msg string) (err error){
queueExchange := rabbitmq.QueueExchange{
lib.Instance("config").GetString("rabbitmq_golang_async_task.queue_name"),
lib.Instance("config").GetString("rabbitmq_golang_async_task.routing_key"),
lib.Instance("config").GetString("rabbitmq_golang_async_task.exchange"),
lib.Instance("config").GetString("rabbitmq_golang_async_task.type"),
lib.Instance("config").GetString("rabbitmq_golang_async_task.dns"),
}
rabbitmq.Send(queueExchange,msg)
return
}
func Callback(ctx *gin.Context) (err error){
req := &request{}
req.CallbackDomain = ctx.DefaultPostForm("callbackDomain","")
req.CallbackUri = ctx.DefaultPostForm("callbackUri","")
req.RequestType = ctx.DefaultPostForm("requestType","http")
req.Data = ctx.DefaultPostForm("data","")
req.Method = ctx.DefaultPostForm("method","get")
serviceType := ctx.DefaultPostForm("serviceType","0")
req.ServiceType,_ = strconv.Atoi(serviceType)
req.Header = ctx.DefaultPostForm("header","")
is_http_json := ctx.DefaultPostForm("isHttpJson","0")
req.IsHttpJson,_ = strconv.ParseBool(is_http_json)
req.Remark = ctx.DefaultPostForm("remark","")
req.RequestType = strings.ToLower(req.RequestType)
req.Method = strings.ToLower(req.Method)
//字段验证
if !php2go.InArray(req.RequestType,[]string{"http","hprose"}){
return errors.New("The field requestType must be in http or hprose, The default is http")
}
if !php2go.InArray(req.Method,[]string{"get","post"}){
return errors.New("The field `method` must be in get or post, The default is get")
}
if !json.Valid([]byte(req.Data)){
return errors.New("The field `data` is not json string ")
}
if !json.Valid([]byte(req.Header)){
return errors.New("The field `header` is not json string ")
}
//插入数据库
sql := " insert into lie_async_task (callback_domain,callback_uri,request_type,data,method,service_type,careate_time,is_http_json,header,remark) values (?,?,?,?,?,?,?,?,?,?) "
//fmt.Println(sql)
session := http.Dao.GetDb("async_task").NewSession()
defer session.Close()
err = session.Begin()
if err != nil{
return
}
xlog.Instance("http").Info("%s",ctx.Request.PostForm)
var field_is_http_json int
if req.IsHttpJson {
field_is_http_json = 1
}else{
field_is_http_json = 0
}
res,err := session.Exec(sql,req.CallbackDomain,req.CallbackUri,req.RequestType,req.Data,req.Method,req.ServiceType,time.Now().Unix(),field_is_http_json,req.Header,req.Remark)
if err != nil{
return
}
req.TaskId,err = res.LastInsertId()
if msg ,err :=json.Marshal(req); err == nil{
//插入队列
addMsg(string(msg))
err = session.Commit()
}else{
err = session.Rollback()
}
return
}
package task
import (
"encoding/json"
"errors"
"github.com/tidwall/gjson"
"golang-callback/util/lib"
"net/http"
"net/url"
"strings"
logger "github.com/ichunt2019/lxLog/log"
)
type request struct {
CallbackDomain string `json:"callback_domain"`
CallbackUri string `json:"callback_uri"`
RequestType string `json:"request_type"`
Data string `json:"data"`
Method string `json:"method"`
ServiceType int `json:"service_type"`
TaskId int64 `json:"task_id"`
Header string `json:"header"`
IsHttpJson bool `json:"is_http_json"`
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
func Forward(data []byte)(err error){
requestParams := &request{}
if err = json.Unmarshal(data,requestParams);err != nil{
return
}
switch requestParams.RequestType {
case "http":
switch requestParams.Method {
case "get":
err = httpGet(requestParams)
break
case "post":
if requestParams.IsHttpJson{
//fmt.Println("json")
err = httpJson(requestParams)
}else{
//fmt.Println("post")
err = httpPost(requestParams)
}
break
}
case "hprose":
break
}
return
}
func getUrl(requestParams *request) string{
var (
target string
push_url string
)
//请求接口
if !strings.HasPrefix(requestParams.CallbackDomain,"http://") && !strings.HasPrefix(requestParams.CallbackDomain,"https://") {
target = "http://" +requestParams.CallbackDomain
}else{
target = requestParams.CallbackDomain
}
push_url = singleJoiningSlash(target,requestParams.CallbackUri)
return push_url
}
func httpGet(requestParams *request) (err error){
push_url := getUrl(requestParams)
header := http.Header{}
headerParams := gjson.Parse(requestParams.Header).Map()
for k,v :=range headerParams{
header.Set(k,v.String())
}
urlParams := url.Values{}
dataParams := gjson.Parse(requestParams.Data).Map()
for k,v :=range dataParams{
urlParams.Add(k,v.String())
}
response,returnData,err := lib.HttpGET(push_url,urlParams,30*1000,header)
if err != nil {
return
}
logger.Instance("task").Info("请求url:%s,参数:%s,返回:%s",push_url,urlParams,string(returnData))
if err == nil && response.StatusCode == 200{
err_code := gjson.Parse(string(returnData)).Get("err_code").Int()
msg := gjson.Parse(string(returnData)).Get("err_msg").String()
if err_code != 0{
err = errors.New(msg)
}
}else{
err = errors.New(response.Status)
}
return
}
func httpPost(requestParams *request)(err error){
push_url := getUrl(requestParams)
urlParams := url.Values{}
dataParams := gjson.Parse(requestParams.Data).Map()
for k,v :=range dataParams{
urlParams.Add(k,v.String())
}
header := http.Header{}
headerParams := gjson.Parse(requestParams.Header).Map()
header.Set("Content-Type","application/x-www-form-urlencoded")
for k,v :=range headerParams{
header.Set(k,v.String())
}
response,returnData,err := lib.HttpPOST(push_url,urlParams,30*1000,header,"")
if err != nil {
return
}
logger.Instance("task").Info("请求url:%s,参数:%s,返回:%s",push_url,urlParams,string(returnData))
if err == nil && response.StatusCode == 200{
err_code := gjson.Parse(string(returnData)).Get("err_code").Int()
msg := gjson.Parse(string(returnData)).Get("err_msg").String()
if err_code != 0{
err = errors.New(msg)
}
}else{
err = errors.New(response.Status)
}
return
}
func httpJson(requestParams *request)(err error){
push_url := getUrl(requestParams)
urlParams := url.Values{}
dataParams := gjson.Parse(requestParams.Data).Map()
for k,v :=range dataParams{
urlParams.Add(k,v.String())
}
header := http.Header{}
headerParams := gjson.Parse(requestParams.Header).Map()
header.Set("Content-Type","application/json")
for k,v :=range headerParams{
header.Set(k,v.String())
}
response,returnData,err := lib.HttpJSON(push_url,requestParams.Data,30*1000,header)
if err != nil {
return
}
logger.Instance("task").Info("请求url:%s,参数:%s,返回:%s",push_url,urlParams,string(returnData))
if err == nil && response.StatusCode == 200{
err_code := gjson.Parse(string(returnData)).Get("err_code").Int()
msg := gjson.Parse(string(returnData)).Get("err_msg").String()
if err_code != 0{
err = errors.New(msg)
}
}else if (err != nil){
err = errors.New(response.Status)
}
return
}
package task
import (
"github.com/ichunt2019/cfg/lib"
logger "github.com/ichunt2019/lxLog/log"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
func (t *RecvPro) Consumer(dataByte []byte) error {
//fmt.Println(string(dataByte))
logger.Instance("task").Info(string(dataByte))
//error := PushWechatToYunXin(string(dataByte))
error := Forward(dataByte)
return error
}
//消息已经消费3次 失败了 请进行处理
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
logger.Instance("task").Error("任务处理失败了,我要进入db日志库了")
logger.Instance("task").Error("任务处理失败了,发送钉钉消息通知主人")
logger.Instance("task").Error(string(dataByte))
logger.Instance("task").Error("错误原因:%s",err)
return nil
}
func Listen(){
t := &RecvPro{}
rabbitmq.Recv(rabbitmq.QueueExchange{
lib.Instance("config").GetString("rabbitmq_golang_async_task.queue_name"),
lib.Instance("config").GetString("rabbitmq_golang_async_task.routing_key"),
lib.Instance("config").GetString("rabbitmq_golang_async_task.exchange"),
lib.Instance("config").GetString("rabbitmq_golang_async_task.type"),
lib.Instance("config").GetString("rabbitmq_golang_async_task.dns"),
},t,5)
}
package http
import (
"github.com/ichunt2019/cfg/lib"
xlog "github.com/ichunt2019/lxLog/log"
"golang-callback/app/dao/http"
)
func Init(configPath string,logPath string)(err error){
err = lib.Init(configPath)
if err != nil{
panic(err)
}
xlog.Init(logPath,"http")
//初始化数据库
http.Init()
return
}
package task
import (
"github.com/ichunt2019/cfg/lib"
xlog "github.com/ichunt2019/lxLog/log"
"golang-callback/app/dao/task"
)
func Init(configPath string,logPath string)(err error){
err = lib.Init(configPath)
if err != nil{
panic(err)
}
xlog.Init(logPath,"task")
//初始化数据库
task.Init()
return
}
package main
import (
"flag"
"golang-callback/boot/http"
"os"
"os/signal"
"syscall"
"golang-callback/router"
"golang-callback/util/regist_etcd"
)
var (
configPath string
logPath string
)
func main(){
flag.StringVar(&configPath, "config", "./config/dev/", "配置文件")
flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录")
flag.Parse()
http.Init(configPath,logPath)
go func() {
router.HttpServerRun()
}()
//微服务注册
regist_etcd.Init()
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
router.HttpServerStop()
}
package main
import (
"flag"
"golang-callback/boot/task"
"os"
"os/signal"
"syscall"
t "golang-callback/app/service/task"
)
var (
configPath string
logPath string
)
func main(){
flag.StringVar(&configPath, "config", "./config/dev/", "配置文件")
flag.StringVar(&logPath, "logdir", "./logs/", "日志文件存储目录")
flag.Parse()
task.Init(configPath,logPath)
t.Listen()
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
title = "TOML 例子"
viewpath = "/home/www/templates/"
[rabbitmq_golang_async_task]
queue_name="golang_async_task"
routing_key="golang_async_task"
exchange=""
type="direct"
dns="amqp://guest:guest@192.168.1.252:5672/"
[owner]
name = "Tom Preston-Werner"
organization = "GitHub"
bio = "GitHub Cofounder & CEO\nLikes tater tots and beer."
dob = 1979-05-27T07:32:00Z # 日期时间是一等公民。为什么不呢?
[servers]
# 你可以依照你的意愿缩进。使用空格或Tab。TOML不会在意。
[servers.alpha]
ip = "10.0.0.1"
dc = "eqdc10"
[servers.beta]
ip = "10.0.0.2"
dc = "eqdc10"
[clients]
data = [["gamma", "delta"],[1, 2]]
[database]
[database.default]
host = "192.168.2.246"
[supplier_no_brand]
3 = [615,757,46596,43172,52,46481,47811,48817]
7 = [47778]
9 = [47778,4589,12369]
[xorm]
ShowSQL = false
[async_task]
data_source_name = "lie_async_task:lie_async_task#zsyM@tcp(192.168.1.252:3306)/liexin_async_task?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20
max_idle_conn = 10
table_prefix = ""
max_conn_life_time = 100
[liexin]
max_open_conn = 20
max_idle_conn = 10
table_prefix = ""
max_conn_life_time = 100
data_source_name = [
"micro_service:lie_micro_service#zsyM@tcp(192.168.1.252:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing",
"micro_service:lie_micro_service#zsyM@tcp(192.168.1.252:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing",
"micro_service:lie_micro_service#zsyM@tcp(192.168.1.252:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing",
]
[cms]
dns = "micro_service:lie_micro_service#zsyM@tcp(192.168.1.252:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20
max_idle_conn = 10
table_prefix = ""
max_conn_life_time = 100
[sku]
[sku.sku_0]
dns = "micro_service:lie_micro_service#zsyM@tcp(192.168.1.252:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20
max_idle_conn = 10
table_prefix = ""
max_conn_life_time = 100
[sku.sku_1]
dns = "micro_service:lie_micro_service#zsyM@tcp(192.168.1.252:3306)/lie_micro_service?charset=utf8&parseTime=true&loc=Asia%2FChongqing"
max_open_conn = 20
max_idle_conn = 10
table_prefix = ""
max_conn_life_time = 100
#服务注册名称
micro_service_name = "golang_async_task_http"
#etcd配置信息 etcd服务的ip端口用户密码
[etcd_config]
addrs = [
"192.168.1.252:2379"
]
username = ""
password = ""
#注册到etcd中的ip 端口 权重信息
[etcd_regist]
ip = "192.168.1.168"
port = 60020
weight = 10
[base]
debug_mode="debug"
time_location="Asia/Chongqing"
[http]
addr ="192.168.1.168:8700" # 监听地址, default ":8700"
read_timeout = 10 # 读取超时时长
write_timeout = 10 # 写入超时时长
max_header_bytes = 20 # 最大的header大小,二进制位长度
[api]
dial_timeout = 20
min_idle_conns = 10
read_timeout = 10
write_timeout = 10
[api.master]
host = "192.168.1.235:6379"
password = "icDb29mLy2s"
[api.slave]
password = "icDb29mLy2s"
host = [
"192.168.1.235:6379",
"192.168.1.237:6379",
]
[sku]
dial_timeout = 20
min_idle_conns = 10
read_timeout = 10
write_timeout = 10
[sku.master]
host = "192.168.1.235:6379"
password = "icDb29mLy2s"
[sku.slave]
password = "icDb29mLy2s"
host = [
"192.168.1.235:6379",
"192.168.1.237:6379",
]
module golang-callback
go 1.14
require (
github.com/Chronokeeper/anyxml v0.0.0-20160530174208-54457d8e98c6 // indirect
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53 // indirect
github.com/CloudyKit/jet v2.1.2+incompatible // indirect
github.com/agrison/go-tablib v0.0.0-20160310143025-4930582c22ee // indirect
github.com/agrison/mxj v0.0.0-20160310142625-1269f8afb3b4 // indirect
github.com/bndr/gotabulate v1.1.2 // indirect
github.com/boj/redistore v0.0.0-20180917114910-cd5dcc76aeff // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/gin-gonic/contrib v0.0.0-20201101042839-6a891bf89f19 // indirect
github.com/gin-gonic/gin v1.6.3
github.com/go-redis/redis/v7 v7.4.0
github.com/go-sql-driver/mysql v1.5.0
github.com/gorilla/sessions v1.2.1 // indirect
github.com/ichunt2019/cfg v0.0.0-20210310074903-4b1bcab17717
github.com/ichunt2019/go-redis-pool v0.0.0-20210305064829-86b9011c57f5
github.com/ichunt2019/golang-rbmq-sl v0.0.0-20200515075131-59a37ab77d7d
github.com/ichunt2019/ichunt-micro-registry v1.0.1
github.com/ichunt2019/lxLog v0.0.0-20210526032824-fd07e8b73ce0
github.com/lib/pq v1.9.0 // indirect
github.com/mattn/go-sqlite3 v1.14.6 // indirect
github.com/spf13/viper v1.7.1
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/syyongx/php2go v0.9.4
github.com/tealeg/xlsx v1.0.5 // indirect
github.com/tidwall/gjson v1.6.8
github.com/xormplus/builder v0.0.0-20200331055651-240ff40009be // indirect
github.com/xormplus/xorm v0.0.0-20210107091022-175d736afaae // indirect
google.golang.org/grpc/examples v0.0.0-20210226164526-c949703b4b98 // indirect
gopkg.in/flosch/pongo2.v3 v3.0.0-20141028000813-5e81b817a0c4 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
xorm.io/xorm v1.0.7
)
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
This diff is collapsed. Click to expand it.
package middleware
import (
"errors"
"fmt"
"github.com/gin-gonic/gin"
common "golang-callback/app/common/function"
"golang-callback/util"
"github.com/ichunt2019/cfg/lib"
"runtime/debug"
)
// RecoveryMiddleware捕获所有panic,并且返回错误信息
func RecoveryMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
//fmt.Println(lib.Instance("proxy").GetString("base.debug_mode"))
defer func() {
if err := recover(); err != nil {
//先做一下日志记录
//fmt.Println(string(debug.Stack()))
util.ComLogNotice(c, "_com_panic", map[string]interface{}{
"error": fmt.Sprint(err),
"stack": string(debug.Stack()),
})
if lib.Instance("proxy").GetString("base.debug_mode") != "debug" {
//ResponseError(c, 500, errors.New("内部错误"))
common.Output(c,500,"内部错误",nil)
return
} else {
common.Output(c,500,"内部错误",errors.New(fmt.Sprint(err)))
return
}
}
}()
c.Next()
}
}
\ No newline at end of file
package middleware
import (
"bytes"
"github.com/gin-gonic/gin"
"golang-callback/util"
"golang-callback/util/lib"
"io/ioutil"
"time"
)
// 请求进入日志
func RequestInLog(c *gin.Context) {
traceContext := lib.NewTrace()
if traceId := c.Request.Header.Get("com-header-rid"); traceId != "" {
traceContext.TraceId = traceId
}
if spanId := c.Request.Header.Get("com-header-spanid"); spanId != "" {
traceContext.SpanId = spanId
}
c.Set("startExecTime", time.Now())
c.Set("trace", traceContext)
bodyBytes, _ := ioutil.ReadAll(c.Request.Body)
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes)) // Write body back
lib.Log.TagInfo(traceContext, "_com_request_in", map[string]interface{}{
"uri": c.Request.RequestURI,
"method": c.Request.Method,
"args": c.Request.PostForm,
"body": string(bodyBytes),
"from": c.ClientIP(),
})
}
// 请求输出日志
func RequestOutLog(c *gin.Context) {
// after request
endExecTime := time.Now()
response, _ := c.Get("response")
st, _ := c.Get("startExecTime")
startExecTime, _ := st.(time.Time)
util.ComLogNotice(c, "_com_request_out", map[string]interface{}{
"uri": c.Request.RequestURI,
"method": c.Request.Method,
"args": c.Request.PostForm,
"from": c.ClientIP(),
"response": response,
"proc_time": endExecTime.Sub(startExecTime).Seconds(),
})
}
func RequestLog() gin.HandlerFunc {
return func(c *gin.Context) {
RequestInLog(c)
defer RequestOutLog(c)
c.Next()
}
}
package middleware
import (
"encoding/json"
"fmt"
"golang-callback/util/lib"
"github.com/gin-gonic/gin"
"strings"
)
type ResponseCode int
//1000以下为通用码,1000以上为用户自定义码
const (
SuccessCode ResponseCode = iota
UndefErrorCode
ValidErrorCode
InternalErrorCode
InvalidRequestErrorCode ResponseCode = 401
CustomizeCode ResponseCode = 1000
GROUPALL_SAVE_FLOWERROR ResponseCode = 2001
)
type Response struct {
ErrorCode ResponseCode `json:"err_code"`
ErrorMsg string `json:"err_msg"`
Data interface{} `json:"data"`
TraceId interface{} `json:"trace_id"`
Stack interface{} `json:"stack"`
}
func ResponseError(c *gin.Context, code ResponseCode, err error) {
trace, _ := c.Get("trace")
traceContext, _ := trace.(*lib.TraceContext)
traceId := ""
if traceContext != nil {
traceId = traceContext.TraceId
}
stack := ""
if c.Query("is_debug") == "1" || lib.GetConfEnv() == "dev" {
stack = strings.Replace(fmt.Sprintf("%+v", err), err.Error()+"\n", "", -1)
}
traceId=traceId
stack=stack
//resp := &Response{ErrorCode: code, ErrorMsg: err.Error(), Data: "", TraceId: traceId, Stack: stack}
resp := &Response{ErrorCode: code, ErrorMsg: err.Error(), Data: ""}
c.JSON(200, resp)
response, _ := json.Marshal(resp)
c.Set("response", string(response))
c.AbortWithError(200, err)
}
func ResponseSuccess(c *gin.Context, data interface{}) {
trace, _ := c.Get("trace")
traceContext, _ := trace.(*lib.TraceContext)
traceId := ""
if traceContext != nil {
traceId = traceContext.TraceId
}
resp := &Response{ErrorCode: SuccessCode, ErrorMsg: "", Data: data, TraceId: traceId}
c.JSON(200, resp)
response, _ := json.Marshal(resp)
c.Set("response", string(response))
}
package router
import (
"context"
"github.com/gin-gonic/gin"
cfg "github.com/ichunt2019/cfg/lib"
"golang-callback/middleware"
"log"
"net/http"
"time"
)
var (
HttpSrvHandler *http.Server
HttpsSrvHandler *http.Server
)
func HttpServerRun() {
//debug release test 开发使用debug模式
//fmt.Println(cfg.Instance("proxy").GetString("base.debug_mode"))
gin.SetMode(cfg.Instance("proxy").GetString("base.debug_mode"))
r := InitRouter(
//middleware.RequestLog(),
middleware.RecoveryMiddleware(),
)
HttpSrvHandler = &http.Server{
Addr: cfg.Instance("proxy").GetString("http.addr"),
//Addr: "192.168.1.234:2002",
Handler: r,
ReadTimeout: time.Duration(cfg.Instance("proxy").GetInt("http.read_timeout")) * time.Second,
WriteTimeout: time.Duration(cfg.Instance("proxy").GetInt("http.write_timeout")) * time.Second,
MaxHeaderBytes: 1 << uint(cfg.Instance("proxy").GetInt("http.max_header_bytes")),
}
if err := HttpSrvHandler.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf(" [ERROR] http_proxy_run %s err:%v\n", cfg.Instance("proxy").GetString("http.addr"), err)
}
}
/*
异常退出告警提醒
*/
func HttpServerStop() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := HttpSrvHandler.Shutdown(ctx); err != nil {
log.Printf(" [ERROR] http_proxy_stop err:%v\n", err)
}
log.Printf(" [INFO] http_proxy_stop %v stopped\n", cfg.Instance("proxy").GetString("http.addr"))
}
package router
import (
"github.com/gin-gonic/gin"
"golang-callback/app/controller"
)
func InitRouter(middlewares ...gin.HandlerFunc) *gin.Engine {
//todo 优化点1
//router := gin.Default()
router := gin.New()
router.Use(middlewares...)
router.GET("/ping", controller.Ping)
router.GET("/hbsdata",controller.Hbsdata)
router.POST("/callback", controller.Callback)
router.Use(
gin.Logger(),
)
return router
}
package lib
import (
"bytes"
"fmt"
"github.com/spf13/viper"
"io/ioutil"
"os"
"strings"
)
var ConfEnvPath string //配置文件夹
var ConfEnv string //配置环境名 比如:dev prod test
// 解析配置文件目录
//
// 配置文件必须放到一个文件夹中
// 如:config=conf/dev/base.json ConfEnvPath=conf/dev ConfEnv=dev
// 如:config=conf/base.json ConfEnvPath=conf ConfEnv=conf
func ParseConfPath(config string) error {
path := strings.Split(config, "/")
prefix := strings.Join(path[:len(path)-1], "/")
ConfEnvPath = prefix
ConfEnv = path[len(path)-2]
return nil
}
//获取配置环境名
func GetConfEnv() string{
return ConfEnv
}
func GetConfPath(fileName string) string {
return ConfEnvPath + "/" + fileName + ".toml"
}
func GetConfFilePath(fileName string) string {
return ConfEnvPath + "/" + fileName
}
//本地解析文件
func ParseLocalConfig(fileName string, st interface{}) error {
path := GetConfFilePath(fileName)
err := ParseConfig(path, st)
if err != nil {
return err
}
return nil
}
func ParseConfig(path string, conf interface{}) error {
file, err := os.Open(path)
if err != nil {
return fmt.Errorf("Open config %v fail, %v", path, err)
}
data, err := ioutil.ReadAll(file)
if err != nil {
return fmt.Errorf("Read config fail, %v", err)
}
v:=viper.New()
v.SetConfigType("toml")
v.ReadConfig(bytes.NewBuffer(data))
if err:=v.Unmarshal(conf);err!=nil{
return fmt.Errorf("Parse config fail, config:%v, err:%v", string(data), err)
}
return nil
}
package lib
import (
"bytes"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"regexp"
"strings"
"time"
)
var TimeLocation *time.Location
var TimeFormat = "2006-01-02 15:04:05"
var DateFormat = "2006-01-02"
var LocalIP = net.ParseIP("127.0.0.1")
func HttpGET( urlString string, urlParams url.Values, msTimeout int, header http.Header) (*http.Response, []byte, error) {
client := http.Client{
Timeout: time.Duration(msTimeout) * time.Millisecond,
}
urlString = AddGetDataToUrl(urlString, urlParams)
req, err := http.NewRequest("GET", urlString, nil)
if err != nil {
return nil, nil, err
}
if len(header) > 0 {
req.Header = header
}
resp, err := client.Do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}
return resp, body, nil
}
func HttpPOST(urlString string, urlParams url.Values, msTimeout int, header http.Header, contextType string) (*http.Response, []byte, error) {
client := http.Client{
Timeout: time.Duration(msTimeout) * time.Millisecond,
}
if contextType == "" {
contextType = "application/x-www-form-urlencoded"
}
urlParamEncode := urlParams.Encode()
req, err := http.NewRequest("POST", urlString, strings.NewReader(urlParamEncode))
if len(header) > 0 {
req.Header = header
}
req.Header.Set("Content-Type", contextType)
resp, err := client.Do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}
return resp, body, nil
}
func HttpJSON(urlString string, jsonContent string, msTimeout int, header http.Header) (*http.Response, []byte, error) {
client := http.Client{
Timeout: time.Duration(msTimeout) * time.Millisecond,
}
req, err := http.NewRequest("POST", urlString, strings.NewReader(jsonContent))
if len(header) > 0 {
req.Header = header
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}
return resp, body, nil
}
func AddGetDataToUrl(urlString string, data url.Values) string {
if strings.Contains(urlString, "?") {
urlString = urlString + "&"
} else {
urlString = urlString + "?"
}
return fmt.Sprintf("%s%s", urlString, data.Encode())
}
func addTrace2Header(request *http.Request, trace *TraceContext) *http.Request {
traceId := trace.TraceId
cSpanId := NewSpanId()
if traceId != "" {
request.Header.Set("didi-header-rid", traceId)
}
if cSpanId != "" {
request.Header.Set("didi-header-spanid", cSpanId)
}
trace.CSpanId = cSpanId
return request
}
func GetMd5Hash(text string) string {
hasher := md5.New()
hasher.Write([]byte(text))
return hex.EncodeToString(hasher.Sum(nil))
}
func Encode(data string) (string, error) {
h := md5.New()
_, err := h.Write([]byte(data))
if err != nil {
return "", err
}
return hex.EncodeToString(h.Sum(nil)), nil
}
func ParseServerAddr(serverAddr string) (host, port string) {
serverInfo := strings.Split(serverAddr, ":")
if len(serverInfo) == 2 {
host = serverInfo[0]
port = serverInfo[1]
} else {
host = serverAddr
port = ""
}
return host, port
}
func NewTrace() *TraceContext {
trace := &TraceContext{}
trace.TraceId = GetTraceId()
trace.SpanId = NewSpanId()
return trace
}
func NewSpanId() string {
timestamp := uint32(time.Now().Unix())
ipToLong := binary.BigEndian.Uint32(LocalIP.To4())
b := bytes.Buffer{}
b.WriteString(fmt.Sprintf("%08x", ipToLong^timestamp))
b.WriteString(fmt.Sprintf("%08x", rand.Int31()))
return b.String()
}
func GetTraceId() (traceId string) {
return calcTraceId(LocalIP.String())
}
func calcTraceId(ip string) (traceId string) {
now := time.Now()
timestamp := uint32(now.Unix())
timeNano := now.UnixNano()
pid := os.Getpid()
b := bytes.Buffer{}
netIP := net.ParseIP(ip)
if netIP == nil {
b.WriteString("00000000")
} else {
b.WriteString(hex.EncodeToString(netIP.To4()))
}
b.WriteString(fmt.Sprintf("%08x", timestamp&0xffffffff))
b.WriteString(fmt.Sprintf("%04x", timeNano&0xffff))
b.WriteString(fmt.Sprintf("%04x", pid&0xffff))
b.WriteString(fmt.Sprintf("%06x", rand.Int31n(1<<24)))
b.WriteString("b0") // 末两位标记来源,b0为go
return b.String()
}
func GetLocalIPs() (ips []net.IP) {
interfaceAddr, err := net.InterfaceAddrs()
if err != nil {
return nil
}
for _, address := range interfaceAddr {
ipNet, isValidIpNet := address.(*net.IPNet)
if isValidIpNet && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
ips = append(ips, ipNet.IP)
}
}
}
return ips
}
func InArrayString(s string, arr []string) bool {
for _, i := range arr {
if i == s {
return true
}
}
return false
}
//Substr 字符串的截取
func Substr(str string, start int64, end int64) string {
length := int64(len(str))
if start < 0 || start > length {
return ""
}
if end < 0 {
return ""
}
if end > length {
end = length
}
return string(str[start:end])
}
//利用正则表达式压缩字符串,去除空格或制表符
func CompressStr(str string) string {
if str == "" {
return ""
}
//匹配一个或多个空白符的正则表达式
reg := regexp.MustCompile("\\s+")
return reg.ReplaceAllString(str, "")
}
func ClientIP(r *http.Request) string {
xForwardedFor := r.Header.Get("X-Forwarded-For")
ip := strings.TrimSpace(strings.Split(xForwardedFor, ",")[0])
if ip != "" {
return ip
}
ip = strings.TrimSpace(r.Header.Get("X-Real-Ip"))
if ip != "" {
return ip
}
if ip, _, err := net.SplitHostPort(strings.TrimSpace(r.RemoteAddr)); err == nil {
return ip
}
return ""
}
\ No newline at end of file
package lib
import (
_ "github.com/go-sql-driver/mysql"
"github.com/ichunt2019/cfg/lib"
"golang-callback/app/common/config/http"
"xorm.io/xorm"
)
var DatabaseConMap map[string]*xorm.Engine
func Setup() error {
DatabaseConMap = make(map[string]*xorm.Engine, 0)
DatabaseList := http.BuildDatabaseList()
var err error
//循环生成数据库链接
for conName, db := range DatabaseList {
DatabaseConMap[conName], err = xorm.NewEngine("mysql", db.DataSourceName)
if err != nil {
panic(err)
}
//日志打印SQL
ShowSql := lib.Instance("db").GetBool("xorm.ShowSQL")
DatabaseConMap[conName].ShowSQL(ShowSql)
//设置连接池的空闲数大小
DatabaseConMap[conName].SetMaxIdleConns(db.MaxIdleCons)
//设置最大打开连接数
DatabaseConMap[conName].SetMaxOpenConns(db.MaxOpenCons)
}
return nil
}
func Conn(conName string) *xorm.Engine {
return DatabaseConMap[conName]
}
package lib
import (
"fmt"
dlog "github.com/ichunt2019/lxLog/log"
"strings"
)
// 通用DLTag常量定义
const (
DLTagUndefind = "_undef"
DLTagMySqlFailed = "_com_mysql_failure"
DLTagRedisFailed = "_com_redis_failure"
DLTagMySqlSuccess = "_com_mysql_success"
DLTagRedisSuccess = "_com_redis_success"
DLTagThriftFailed = "_com_thrift_failure"
DLTagThriftSuccess = "_com_thrift_success"
DLTagHTTPSuccess = "_com_http_success"
DLTagHTTPFailed = "_com_http_failure"
DLTagTCPFailed = "_com_tcp_failure"
DLTagRequestIn = "_com_request_in"
DLTagRequestOut = "_com_request_out"
)
const (
_dlTag = "dltag"
_traceId = "traceid"
_spanId = "spanid"
_childSpanId = "cspanid"
_dlTagBizPrefix = "_com_"
_dlTagBizUndef = "_com_undef"
)
var Log *Logger
type Trace struct {
TraceId string
SpanId string
Caller string
SrcMethod string
HintCode int64
HintContent string
}
type TraceContext struct {
Trace
CSpanId string
}
type Logger struct {
}
func (l *Logger) TagInfo(trace *TraceContext, dltag string, m map[string]interface{}) {
m[_dlTag] = checkDLTag(dltag)
m[_traceId] = trace.TraceId
m[_childSpanId] = trace.CSpanId
m[_spanId] = trace.SpanId
dlog.Instance("request").Info(parseParams(m))
}
func (l *Logger) TagWarn(trace *TraceContext, dltag string, m map[string]interface{}) {
m[_dlTag] = checkDLTag(dltag)
m[_traceId] = trace.TraceId
m[_childSpanId] = trace.CSpanId
m[_spanId] = trace.SpanId
dlog.Instance("request").Warn(parseParams(m))
}
func (l *Logger) TagError(trace *TraceContext, dltag string, m map[string]interface{}) {
m[_dlTag] = checkDLTag(dltag)
m[_traceId] = trace.TraceId
m[_childSpanId] = trace.CSpanId
m[_spanId] = trace.SpanId
dlog.Instance("request").Error(parseParams(m))
}
func (l *Logger) TagTrace(trace *TraceContext, dltag string, m map[string]interface{}) {
m[_dlTag] = checkDLTag(dltag)
m[_traceId] = trace.TraceId
m[_childSpanId] = trace.CSpanId
m[_spanId] = trace.SpanId
dlog.Instance("request").Trace(parseParams(m))
}
func (l *Logger) TagDebug(trace *TraceContext, dltag string, m map[string]interface{}) {
m[_dlTag] = checkDLTag(dltag)
m[_traceId] = trace.TraceId
m[_childSpanId] = trace.CSpanId
m[_spanId] = trace.SpanId
dlog.Instance("request").Debug(parseParams(m))
}
func (l *Logger) Close() {
dlog.Instance("request").Close()
}
// 生成业务dltag
func CreateBizDLTag(tagName string) string {
if tagName == "" {
return _dlTagBizUndef
}
return _dlTagBizPrefix + tagName
}
// 校验dltag合法性
func checkDLTag(dltag string) string {
if strings.HasPrefix(dltag, _dlTagBizPrefix) {
return dltag
}
if strings.HasPrefix(dltag, "_com_") {
return dltag
}
if dltag == DLTagUndefind {
return dltag
}
return dltag
}
//map格式化为string
func parseParams(m map[string]interface{}) string {
var dltag string = "_undef"
if _dltag, _have := m["dltag"]; _have {
if __val, __ok := _dltag.(string); __ok {
dltag = __val
}
}
for _key, _val := range m {
if _key == "dltag" {
continue
}
dltag = dltag + "||" + fmt.Sprintf("%v=%+v", _key, _val)
}
dltag = strings.Trim(fmt.Sprintf("%q", dltag), "\"")
return dltag
}
package util
import (
"context"
"golang-callback/util/lib"
"github.com/gin-gonic/gin"
)
//错误日志
func ContextWarning(c context.Context, dltag string, m map[string]interface{}) {
v:=c.Value("trace")
traceContext,ok := v.(*lib.TraceContext)
if !ok{
traceContext = lib.NewTrace()
}
lib.Log.TagWarn(traceContext, dltag, m)
}
//错误日志
func ContextError(c context.Context, dltag string, m map[string]interface{}) {
v:=c.Value("trace")
traceContext,ok := v.(*lib.TraceContext)
if !ok{
traceContext = lib.NewTrace()
}
lib.Log.TagError(traceContext, dltag, m)
}
//普通日志
func ContextNotice(c context.Context, dltag string, m map[string]interface{}) {
v:=c.Value("trace")
traceContext,ok := v.(*lib.TraceContext)
if !ok{
traceContext = lib.NewTrace()
}
lib.Log.TagInfo(traceContext, dltag, m)
}
//错误日志
func ComLogWarning(c *gin.Context, dltag string, m map[string]interface{}) {
traceContext := GetGinTraceContext(c)
lib.Log.TagError(traceContext, dltag, m)
}
//普通日志
func ComLogNotice(c *gin.Context, dltag string, m map[string]interface{}) {
traceContext := GetGinTraceContext(c)
lib.Log.TagInfo(traceContext, dltag, m)
}
// 从gin的Context中获取数据
func GetGinTraceContext(c *gin.Context) *lib.TraceContext {
// 防御
if c == nil {
return lib.NewTrace()
}
traceContext, exists := c.Get("trace")
if exists {
if tc, ok := traceContext.(*lib.TraceContext); ok {
return tc
}
}
return lib.NewTrace()
}
// 从Context中获取数据
func GetTraceContext(c context.Context) *lib.TraceContext {
if c == nil {
return lib.NewTrace()
}
traceContext:=c.Value("trace")
if tc, ok := traceContext.(*lib.TraceContext); ok {
return tc
}
return lib.NewTrace()
}
package regist_etcd
import (
"github.com/ichunt2019/ichunt-micro-registry/registry"
econfig "github.com/ichunt2019/ichunt-micro-registry/config"
_ "github.com/ichunt2019/ichunt-micro-registry/registry/etcd"
"github.com/ichunt2019/cfg/lib"
)
func Init(){
nodes := []*registry.Node{
{
IP : lib.Instance("proxy").GetString("etcd_regist.ip"),
Port : lib.Instance("proxy").GetInt("etcd_regist.port"),
Weight : lib.Instance("proxy").GetInt("etcd_regist.weight"),
},
}
etcdConfig := registry.EtcdConfig{
Address : lib.Instance("proxy").GetStringSlice("etcd_config.addrs"),
Username : lib.Instance("proxy").GetString("etcd_config.username"),
Password : lib.Instance("proxy").GetString("etcd_config.password"),
Path : "/ichuntMicroService/",
}
econfig.Register(lib.Instance("proxy").GetString("micro_service_name"),etcdConfig,nodes)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment