Commit d507859f by 孙龙

up

parent 464450e9
...@@ -6,8 +6,8 @@ import ( ...@@ -6,8 +6,8 @@ import (
) )
type Response struct { type Response struct {
ErrCode int `json:"errcode"` ErrCode int `json:"err_code"`
ErrMsg string `json:"errmsg"` ErrMsg string `json:"err_msg"`
Data interface{} `json:"data"` Data interface{} `json:"data"`
} }
......
...@@ -32,6 +32,7 @@ type request struct { ...@@ -32,6 +32,7 @@ type request struct {
Header string `json:"header"` Header string `json:"header"`
IsHttpJson bool `json:"is_http_json"` IsHttpJson bool `json:"is_http_json"`
Remark string `json:"remark"` Remark string `json:"remark"`
CallbackFunc string `json:"callback_func"`
} }
...@@ -60,6 +61,7 @@ func Callback(ctx *gin.Context) (err error){ ...@@ -60,6 +61,7 @@ func Callback(ctx *gin.Context) (err error){
is_http_json := ctx.DefaultPostForm("isHttpJson","0") is_http_json := ctx.DefaultPostForm("isHttpJson","0")
req.IsHttpJson,_ = strconv.ParseBool(is_http_json) req.IsHttpJson,_ = strconv.ParseBool(is_http_json)
req.Remark = ctx.DefaultPostForm("remark","") req.Remark = ctx.DefaultPostForm("remark","")
req.CallbackFunc = ctx.DefaultPostForm("callback_func","")
req.RequestType = strings.ToLower(req.RequestType) req.RequestType = strings.ToLower(req.RequestType)
req.Method = strings.ToLower(req.Method) req.Method = strings.ToLower(req.Method)
...@@ -82,9 +84,13 @@ func Callback(ctx *gin.Context) (err error){ ...@@ -82,9 +84,13 @@ func Callback(ctx *gin.Context) (err error){
return errors.New("The field `header` is not json string ") return errors.New("The field `header` is not json string ")
} }
if req.RequestType == "hprose" && req.CallbackFunc == ""{
return errors.New("you are using the HProse service, field callback_func is must ")
}
//插入数据库 //插入数据库
sql := " insert into lie_async_task (callback_domain,callback_uri,request_type,data,method,service_type,careate_time,is_http_json,header,remark) values (?,?,?,?,?,?,?,?,?,?) " sql := " insert into lie_async_task (callback_domain,callback_uri,request_type,data,method,service_type,create_time,is_http_json,header,remark,callback_func) values (?,?,?,?,?,?,?,?,?,?,?) "
//fmt.Println(sql) //fmt.Println(sql)
session := http.Dao.GetDb("async_task").NewSession() session := http.Dao.GetDb("async_task").NewSession()
defer session.Close() defer session.Close()
...@@ -99,7 +105,7 @@ func Callback(ctx *gin.Context) (err error){ ...@@ -99,7 +105,7 @@ func Callback(ctx *gin.Context) (err error){
}else{ }else{
field_is_http_json = 0 field_is_http_json = 0
} }
res,err := session.Exec(sql,req.CallbackDomain,req.CallbackUri,req.RequestType,req.Data,req.Method,req.ServiceType,time.Now().Unix(),field_is_http_json,req.Header,req.Remark) res,err := session.Exec(sql,req.CallbackDomain,req.CallbackUri,req.RequestType,req.Data,req.Method,req.ServiceType,time.Now().Unix(),field_is_http_json,req.Header,req.Remark,req.CallbackFunc)
if err != nil{ if err != nil{
return return
} }
......
...@@ -3,12 +3,17 @@ package task ...@@ -3,12 +3,17 @@ package task
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"golang-callback/util/lib" "golang-callback/util/lib"
"net/http" "net/http"
"net/url" "net/url"
"reflect"
"strings" "strings"
logger "github.com/ichunt2019/lxLog/log" logger "github.com/ichunt2019/lxLog/log"
dao "golang-callback/app/dao/task"
"github.com/hprose/hprose-golang/rpc"
"github.com/hprose/hprose-golang/io"
) )
type request struct { type request struct {
...@@ -21,6 +26,7 @@ type request struct { ...@@ -21,6 +26,7 @@ type request struct {
TaskId int64 `json:"task_id"` TaskId int64 `json:"task_id"`
Header string `json:"header"` Header string `json:"header"`
IsHttpJson bool `json:"is_http_json"` IsHttpJson bool `json:"is_http_json"`
CallbackFunc string `json:"callback_func"`
} }
...@@ -38,33 +44,58 @@ func singleJoiningSlash(a, b string) string { ...@@ -38,33 +44,58 @@ func singleJoiningSlash(a, b string) string {
} }
func Forward(data []byte)(err error){ func Forward(data []byte)(err error){
var (
errmsg string
)
requestParams := &request{} requestParams := &request{}
if err = json.Unmarshal(data,requestParams);err != nil{ if err = json.Unmarshal(data,requestParams);err != nil{
return return
} }
switch requestParams.RequestType { switch requestParams.RequestType {
case "http": case "http":
switch requestParams.Method { switch requestParams.Method {
case "get": case "get":
err = httpGet(requestParams) errmsg,err = httpGet(requestParams)
break break
case "post": case "post":
if requestParams.IsHttpJson{ if requestParams.IsHttpJson{
//fmt.Println("json") errmsg,err = httpJson(requestParams)
err = httpJson(requestParams)
}else{ }else{
//fmt.Println("post") errmsg,err = httpPost(requestParams)
err = httpPost(requestParams)
} }
break break
default:
err = errors.New("笨小二不知道怎么处理这条消息了,请联系管理员 ^_^")
break
} }
case "hprose": case "hprose":
errmsg,err = hproseFunc(requestParams)
break break
} }
defer updateStatus(requestParams,err,errmsg)
return return
} }
func updateStatus(requestParams *request,err error,errmsg string){
var (
status int
fail_reason string
)
sql := "update lie_async_task set status = ?,fail_reason = ? where id = ?"
if err != nil{
status = 3
fail_reason = err.Error()
}else{
status = 5
fail_reason = errmsg
}
dao.Dao.GetDb("async_task").Exec(sql,status,fail_reason,requestParams.TaskId)
}
func getUrl(requestParams *request) string{ func getUrl(requestParams *request) string{
var ( var (
...@@ -82,7 +113,70 @@ func getUrl(requestParams *request) string{ ...@@ -82,7 +113,70 @@ func getUrl(requestParams *request) string{
return push_url return push_url
} }
func httpGet(requestParams *request) (err error){
type clientStub struct {
Test func(string) (string,error)
}
func hproseFunc(requestParams *request)(errmsg string,err error){
var responseData interface{}
//var stub *clientStub
url := getUrl(requestParams)
client := rpc.NewHTTPClient(url)
//defer client.Close()
//client.UseService(&stub)
//result,err := stub.Test("hello")
//fmt.Println(result)
//fmt.Println(err)
params := reflect.ValueOf(requestParams.Data)
args := append(make([]reflect.Value,0),[]reflect.Value{params}...)
proxySetting := rpc.InvokeSettings{
Mode: rpc.Serialized,
}
header := http.Header{}
headerParams := gjson.Parse(requestParams.Header).Map()
for k,v :=range headerParams{
header.Set(k,v.String())
}
client.Header = header;
result2,err := client.Invoke(requestParams.CallbackFunc,args,&proxySetting)
if err != nil || len(result2) <= 0{
errmsg = "请求失败"
return
}
io.Unserialize(result2[0].Bytes(),&responseData,true)
//fmt.Println(responseData)
logger.Instance("task").Info("请求url:%s ,请求参数:%s,返回:%s",url,requestParams.Data,responseData)
t := reflect.TypeOf(responseData)
k := t.Kind()
switch k {
case reflect.Slice:
fmt.Println("Slice")
break
case reflect.String:
fmt.Println("String")
returnData := responseData.(string)
err_code := gjson.Parse(string(returnData)).Get("err_code").Int()
msg := gjson.Parse(string(returnData)).Get("err_msg").String()
if err_code != 0{
err = errors.New(msg)
}
fmt.Println(returnData)
break
case reflect.Map:
fmt.Println("Map")
default:
fmt.Println("default")
}
errmsg = fmt.Sprintf("%s",responseData)
return
}
func httpGet(requestParams *request) (errmsg string,err error){
push_url := getUrl(requestParams) push_url := getUrl(requestParams)
header := http.Header{} header := http.Header{}
headerParams := gjson.Parse(requestParams.Header).Map() headerParams := gjson.Parse(requestParams.Header).Map()
...@@ -105,6 +199,7 @@ func httpGet(requestParams *request) (err error){ ...@@ -105,6 +199,7 @@ func httpGet(requestParams *request) (err error){
if err_code != 0{ if err_code != 0{
err = errors.New(msg) err = errors.New(msg)
} }
errmsg = string(returnData)
}else{ }else{
err = errors.New(response.Status) err = errors.New(response.Status)
} }
...@@ -112,7 +207,7 @@ func httpGet(requestParams *request) (err error){ ...@@ -112,7 +207,7 @@ func httpGet(requestParams *request) (err error){
} }
func httpPost(requestParams *request)(err error){ func httpPost(requestParams *request)(errmsg string,err error){
push_url := getUrl(requestParams) push_url := getUrl(requestParams)
urlParams := url.Values{} urlParams := url.Values{}
dataParams := gjson.Parse(requestParams.Data).Map() dataParams := gjson.Parse(requestParams.Data).Map()
...@@ -139,6 +234,7 @@ func httpPost(requestParams *request)(err error){ ...@@ -139,6 +234,7 @@ func httpPost(requestParams *request)(err error){
if err_code != 0{ if err_code != 0{
err = errors.New(msg) err = errors.New(msg)
} }
errmsg = string(returnData)
}else{ }else{
err = errors.New(response.Status) err = errors.New(response.Status)
} }
...@@ -146,7 +242,7 @@ func httpPost(requestParams *request)(err error){ ...@@ -146,7 +242,7 @@ func httpPost(requestParams *request)(err error){
return return
} }
func httpJson(requestParams *request)(err error){ func httpJson(requestParams *request)(errmsg string,err error){
push_url := getUrl(requestParams) push_url := getUrl(requestParams)
urlParams := url.Values{} urlParams := url.Values{}
...@@ -173,6 +269,7 @@ func httpJson(requestParams *request)(err error){ ...@@ -173,6 +269,7 @@ func httpJson(requestParams *request)(err error){
if err_code != 0{ if err_code != 0{
err = errors.New(msg) err = errors.New(msg)
} }
errmsg = string(returnData)
}else if (err != nil){ }else if (err != nil){
err = errors.New(response.Status) err = errors.New(response.Status)
} }
......
...@@ -18,6 +18,7 @@ require ( ...@@ -18,6 +18,7 @@ require (
github.com/go-redis/redis/v7 v7.4.0 github.com/go-redis/redis/v7 v7.4.0
github.com/go-sql-driver/mysql v1.5.0 github.com/go-sql-driver/mysql v1.5.0
github.com/gorilla/sessions v1.2.1 // indirect github.com/gorilla/sessions v1.2.1 // indirect
github.com/hprose/hprose-golang v2.0.6+incompatible
github.com/ichunt2019/cfg v0.0.0-20210310074903-4b1bcab17717 github.com/ichunt2019/cfg v0.0.0-20210310074903-4b1bcab17717
github.com/ichunt2019/go-redis-pool v0.0.0-20210305064829-86b9011c57f5 github.com/ichunt2019/go-redis-pool v0.0.0-20210305064829-86b9011c57f5
github.com/ichunt2019/golang-rbmq-sl v0.0.0-20200515075131-59a37ab77d7d github.com/ichunt2019/golang-rbmq-sl v0.0.0-20200515075131-59a37ab77d7d
......
...@@ -222,6 +222,9 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p ...@@ -222,6 +222,9 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hprose/hprose-golang v1.5.1 h1:kztiB7gyJ02KEBcrDACo2iNryKCGED+WavmiXKGn7pU=
github.com/hprose/hprose-golang v2.0.6+incompatible h1:sr5ovwMOacwoKThR36WJqV37ia72VOjBD9u9z5xi5Bk=
github.com/hprose/hprose-golang v2.0.6+incompatible/go.mod h1:FfwwCUQFF3f5t03SrzdSghXVZkC01uEJS6Xwzcz0NOo=
github.com/ichunt2019/cfg v0.0.0-20210225081543-828ee9831d70 h1:k6dwr6fDj5P/0cEvSHtgPDdEUA0FiHK/Je231+S2Mto= github.com/ichunt2019/cfg v0.0.0-20210225081543-828ee9831d70 h1:k6dwr6fDj5P/0cEvSHtgPDdEUA0FiHK/Je231+S2Mto=
github.com/ichunt2019/cfg v0.0.0-20210225081543-828ee9831d70/go.mod h1:No53Lxkw+Z8MViyp07p5WVYnKAXSfLqNIxidtWxKhhA= github.com/ichunt2019/cfg v0.0.0-20210225081543-828ee9831d70/go.mod h1:No53Lxkw+Z8MViyp07p5WVYnKAXSfLqNIxidtWxKhhA=
github.com/ichunt2019/cfg v0.0.0-20210310074903-4b1bcab17717 h1:re7Ju7lqA0/hUfp1/rpzboHdsmLbPkTtAtvn/8jzSts= github.com/ichunt2019/cfg v0.0.0-20210310074903-4b1bcab17717 h1:re7Ju7lqA0/hUfp1/rpzboHdsmLbPkTtAtvn/8jzSts=
......
package middleware
import (
"github.com/gin-gonic/gin"
common "golang-callback/app/common/function"
)
func AuthMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
api_key := c.GetHeader("api-key")
if api_key != "async task qwert12345"{
common.Output(c,500,"恶意请求",nil)
//panic("非法请求");
c.Abort()
return
}
c.Next()
}
}
...@@ -25,13 +25,16 @@ func RecoveryMiddleware() gin.HandlerFunc { ...@@ -25,13 +25,16 @@ func RecoveryMiddleware() gin.HandlerFunc {
if lib.Instance("proxy").GetString("base.debug_mode") != "debug" { if lib.Instance("proxy").GetString("base.debug_mode") != "debug" {
//ResponseError(c, 500, errors.New("内部错误")) //ResponseError(c, 500, errors.New("内部错误"))
common.Output(c,500,"内部错误",nil) common.Output(c,500,"内部错误",nil)
c.Abort()
return return
} else { } else {
common.Output(c,500,"内部错误",errors.New(fmt.Sprint(err))) common.Output(c,500,"内部错误",errors.New(fmt.Sprint(err)))
c.Abort()
return return
} }
} }
}() }()
//fmt.Println("RecoveryMiddleware")
c.Next() c.Next()
} }
} }
\ No newline at end of file
...@@ -24,7 +24,9 @@ func HttpServerRun() { ...@@ -24,7 +24,9 @@ func HttpServerRun() {
r := InitRouter( r := InitRouter(
//middleware.RequestLog(), //middleware.RequestLog(),
middleware.AuthMiddleware(),
middleware.RecoveryMiddleware(), middleware.RecoveryMiddleware(),
) )
HttpSrvHandler = &http.Server{ HttpSrvHandler = &http.Server{
Addr: cfg.Instance("proxy").GetString("http.addr"), Addr: cfg.Instance("proxy").GetString("http.addr"),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment