Commit 31a874d6 by 孙龙

up

parent ea0ee1b1
/go.sum
/cmd/*
/conf/dev/*
/conf/prod/*
/logs/*
......@@ -13,8 +13,10 @@ type HttpRule struct {
Rule string `json:"rule" gorm:"column:rule" description:"type=domain表示域名,type=url_prefix时表示url前缀"`
NeedHttps int `json:"need_https" gorm:"column:need_https" description:"type=支持https 1=支持"`
NeedWebsocket int `json:"need_websocket" gorm:"column:need_websocket" description:"启用websocket 1=启用"`
NeedDirectForward int `json:"need_direct_forward" gorm:"column:need_direct_forward" description:"是否直接转发到外网地址不请求负载均衡 0 否 1 是"`
NeedStripUri int `json:"need_strip_uri" gorm:"column:need_strip_uri" description:"启用strip_uri 1=启用"`
UrlRewrite string `json:"url_rewrite" gorm:"column:url_rewrite" description:"url重写功能,每行一个 "`
DirectForwardUrl string `json:"direct_forward_url" gorm:"column:direct_forward_url" description:"转发外网url"`
HeaderTransfor string `json:"header_transfor" gorm:"column:header_transfor" description:"header转换支持增加(add)、删除(del)、修改(edit) 格式: add headname headvalue "`
}
......
package http_proxy_middleware
import (
"ichunt-micro/dao"
"ichunt-micro/middleware"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
"ichunt-micro/dao"
"ichunt-micro/middleware"
"strings"
)
......@@ -19,6 +19,12 @@ func HTTPHeaderTransferMiddleware() gin.HandlerFunc {
}
serviceDetail := serverInterface.(*dao.ServiceDetail)
for _,item:=range strings.Split(serviceDetail.HTTPRule.HeaderTransfor,","){
item = strings.TrimSpace(item)
item = strings.Trim(item,"\n")
item = strings.Trim(item,"\r\n")
//fmt.Println("~~~~~~~~~~")
//fmt.Println(item)
//fmt.Println("~~~~~~~~~~")
items:=strings.Split(item," ")
if len(items)!=3{
continue
......
......@@ -23,8 +23,8 @@ func HttpServerRun() {
r := InitRouter(middleware.RecoveryMiddleware(),
middleware.RequestLog())
HttpSrvHandler = &http.Server{
Addr: lib.GetStringConf("proxy.http.addr"),
//Addr: "192.168.1.234:2002",
//Addr: lib.GetStringConf("proxy.http.addr"),
Addr: "192.168.1.234:2002",
Handler: r,
ReadTimeout: time.Duration(lib.GetIntConf("proxy.http.read_timeout")) * time.Second,
WriteTimeout: time.Duration(lib.GetIntConf("proxy.http.write_timeout")) * time.Second,
......
The file could not be displayed because it is too large.
......@@ -5,8 +5,8 @@ import (
"flag"
"ichunt-micro/dao"
"ichunt-micro/golang_common/lib"
"ichunt-micro/http_proxy_router"
"ichunt-micro/golang_common/log"
"ichunt-micro/http_proxy_router"
"ichunt-micro/proxy/load_balance"
"ichunt-micro/registry"
_ "ichunt-micro/registry/etcd"
......
......@@ -45,6 +45,7 @@ func NewMultipleHostsReverseProxy(c *gin.Context) (*httputil.ReverseProxy, error
ServiceName string
target *url.URL
err error
nextAddr string
)
//请求协调者
service,exists := c.Get("service")
......@@ -63,14 +64,22 @@ func NewMultipleHostsReverseProxy(c *gin.Context) (*httputil.ReverseProxy, error
ServiceName = service_.Info.ServiceName
ServiceName = php2go.Trim(ServiceName,"/")
ServiceName = lib.CompressStr(ServiceName)
//服务的 负载均衡方式 轮询方式 0=random 1=round-robin 2=weight_round-robin 3=ip_hash
round_type := service_.LoadBalance.RoundType
nextAddr, err := load_balance.LoadBalanceConfig.GetService(context.TODO(),round_type, ServiceName,lib.ClientIP(c.Request))
if err != nil || nextAddr == "" {
err = errors.New(fmt.Sprintf("从etcd中获取服务 %s 失败",ServiceName))
log.Error("%s", err)
return nil,err
//fmt.Println("service_.HTTPRule.NeedDirectForward",service_.HTTPRule.NeedDirectForward)
//fmt.Println("service_.HTTPRule.DirectForwardUrl",service_.HTTPRule.DirectForwardUrl)
if service_.HTTPRule.NeedDirectForward == 0 {
//服务的 负载均衡方式 轮询方式 0=random 1=round-robin 2=weight_round-robin 3=ip_hash
round_type := service_.LoadBalance.RoundType
nextAddr, err = load_balance.LoadBalanceConfig.GetService(context.TODO(),round_type, ServiceName,lib.ClientIP(c.Request))
if err != nil || nextAddr == "" {
err = errors.New(fmt.Sprintf("从etcd中获取服务 %s 失败",ServiceName))
log.Error("%s", err)
return nil,err
}
}else{
//直接转发url
nextAddr = service_.HTTPRule.DirectForwardUrl
}
director := func(req *http.Request) {
if strings.HasPrefix(nextAddr,"http://") || strings.HasPrefix(nextAddr,"https://") {
target, err = url.Parse(nextAddr)
......@@ -80,16 +89,20 @@ func NewMultipleHostsReverseProxy(c *gin.Context) (*httputil.ReverseProxy, error
if err != nil {
log.Error("func NewMultipleHostsReverseProxy 匿名函数director url.Parse地址解析失败 失败 %s",err)
}
//fmt.Println("target",target)
//http://192.168.1.234:2004
targetQuery := target.RawQuery
//http
req.URL.Scheme = target.Scheme
//192.168.1.234:2004
req.URL.Host = target.Host
req.Host = target.Host
///comment_service/test
//fmt.Println("target.Path",target.Path)
//fmt.Println("req.URL.Path",req.URL.Path)
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
//req.URL.Path = php2go.StrReplace(rule,"",req.URL.Path,-1)
log.Info("target %s",target)
log.Info("targetQuery %s",targetQuery)
log.Info("req.URL.Scheme %s",req.URL.Scheme)
......@@ -101,8 +114,11 @@ func NewMultipleHostsReverseProxy(c *gin.Context) (*httputil.ReverseProxy, error
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
//fmt.Println("req.Header",req.Header)
//fmt.Println("req.Method",req.Method)
//fmt.Println("req.Body",req.Body)
if _, ok := req.Header["User-Agent"]; !ok {
req.Header.Set("User-Agent", "user-agent")
req.Header.Set("User-Agent", "")
}
//只在第一代理中设置此header头
......@@ -148,6 +164,7 @@ func NewMultipleHostsReverseProxy(c *gin.Context) (*httputil.ReverseProxy, error
//http.Error(w, "ErrorHandler error:"+err.Error(), 500)
middleware.ResponseError(c,500,err)
}
//fmt.Println(director)
return &httputil.ReverseProxy{Director: director, Transport:transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc},nil
}
......
......@@ -5,7 +5,6 @@ import (
"fmt"
"github.com/syyongx/php2go"
"hash/crc32"
"log"
"net"
"sort"
"strconv"
......@@ -117,6 +116,8 @@ func (c *ConsistentHashBanlance) getLocalIP() (ipv4 string, err error) {
// Get 方法根据给定的对象获取最靠近它的那个节点
func (c *ConsistentHashBanlance) Get(key ...string) (string, error) {
c.lock.Lock()
defer c.lock.Unlock()
if c.IsEmpty() {
return "", errors.New(" node is empty")
......@@ -145,9 +146,9 @@ func (c *ConsistentHashBanlance) Get(key ...string) (string, error) {
func (c *ConsistentHashBanlance) Update() {
c.lock.Lock()
defer c.lock.Unlock()
log.Print("[INFO] 负载均衡器 ip_hash模式 更新负载均衡配置...... ")
//c.lock.Lock()
//defer c.lock.Unlock()
//log.Print("[INFO] 负载均衡器 ip_hash模式 更新负载均衡配置...... ")
//fmt.Println("更新负载均衡配置.....")
allServiceInfo := LoadBalanceConfig.GetLoadBalanceList()
if allServiceInfo == nil || allServiceInfo.ServiceMap == nil{
......
......@@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"github.com/syyongx/php2go"
"log"
"math/rand"
"strconv"
"sync"
......@@ -24,21 +23,20 @@ func (r *RandomBalance) Add(serviceName string,params ...string) error {
if (r.rss == nil){
r.rss = make(map[string][]string,0)
}
if (r.curIndex == nil){
if(r.curIndex == nil){
r.curIndex = make(map[string]int,0)
}
if php2go.InArray(addr,r.rss[serviceName]) == false{
r.rss[serviceName] = append(r.rss[serviceName],addr)
r.curIndex[serviceName] = 0
}
if r.curIndex[serviceName] > len(r.rss[serviceName]){
r.curIndex[serviceName] = 0
}
return nil
}
func (r *RandomBalance) Next(key string) string {
func (r *RandomBalance) Next(serviceName string) string {
r.lock.Lock()
defer r.lock.Unlock()
defer func() {
if r := recover(); r != nil {
return
......@@ -48,18 +46,12 @@ func (r *RandomBalance) Next(key string) string {
return ""
}
if (r.curIndex == nil){
r.curIndex = make(map[string]int,0)
r.curIndex[key] = 0
}
serviceList,ok := r.rss[key]
serviceList,ok := r.rss[serviceName]
if !ok{
return ""
}
r.curIndex[key] = rand.Intn(len(serviceList))
return serviceList[r.curIndex[key]]
randNum := rand.Intn(len(serviceList))
return serviceList[randNum]
}
func (r *RandomBalance) Get(key ...string) (string, error) {
......@@ -73,9 +65,9 @@ func (r *RandomBalance) Get(key ...string) (string, error) {
func (r *RandomBalance) Update() {
r.lock.Lock()
defer r.lock.Unlock()
log.Print("[INFO] 负载均衡器 随机模式 更新负载均衡配置...... ")
//r.lock.Lock()
//defer r.lock.Unlock()
//log.Print("[INFO] 负载均衡器 随机模式 更新负载均衡配置...... ")
//fmt.Println("更新负载均衡配置.....")
allServiceInfo := LoadBalanceConfig.GetLoadBalanceList()
if allServiceInfo == nil || allServiceInfo.ServiceMap == nil{
......@@ -102,9 +94,6 @@ func (r *RandomBalance) Update() {
}
}
r.rss[serviceName] = tmpNodes
if r.curIndex[serviceName] > len(r.rss[serviceName]){
r.curIndex[serviceName] = 0
}
}
......
......@@ -34,7 +34,7 @@ func TestRandomBalance(t *testing.T) {
fmt.Println(rb.Next("ichunt/abc"))
//fmt.Println(rb.Next("ichunt/123"))
//fmt.Println(rb.Next("ichunt/999"))
time.Sleep(time.Second*1)
time.Sleep(time.Millisecond*1)
}
}
......@@ -3,10 +3,7 @@ package load_balance
import (
"errors"
"fmt"
"log"
"sync"
//_ "github.com/ichunt2019/ichunt-micro-service/registry"
"github.com/syyongx/php2go"
"strconv"
)
......@@ -14,12 +11,12 @@ import (
type RoundRobinBalance struct {
curIndex map[string]int
rss map[string][]string
lock sync.Mutex
lock sync.Mutex
}
func (r *RoundRobinBalance) Add(serviceName string,params ...string) error {
r.lock.Lock()
defer r.lock.Unlock()
if len(params) == 0 {
return errors.New("param len 1 at least")
}
......@@ -34,46 +31,53 @@ func (r *RoundRobinBalance) Add(serviceName string,params ...string) error {
if php2go.InArray(addr,r.rss[serviceName]) == false{
r.rss[serviceName] = append(r.rss[serviceName],addr)
r.curIndex[serviceName] = 0
}
if r.curIndex[serviceName] > len(r.rss[serviceName]){
r.curIndex[serviceName] = 0
if _,ok:=r.curIndex[serviceName] ;!ok{
r.curIndex[serviceName] = 0
}
//r.curIndex[serviceName] = 0
}
return nil
}
func (r *RoundRobinBalance) Next(key string) string {
func (r *RoundRobinBalance) Next(serviceName string) string {
defer func() {
if r := recover(); r != nil {
return
}
}()
r.lock.Lock()
defer r.lock.Unlock()
if r.rss == nil || len(r.rss) == 0 {
return ""
}
if _,ok := r.rss[key]; !ok{
if _,ok := r.rss[serviceName]; !ok{
return ""
}
lens := len(r.rss[key]) //节点的个数
if _,ok :=r.curIndex[serviceName];!ok{
return ""
}
currentInt := r.curIndex[serviceName]
lens := len(r.rss[serviceName]) //节点的个数
if (r.curIndex == nil){
r.curIndex = make(map[string]int,0)
r.curIndex[key] = 0
r.curIndex[serviceName] = 0
}
if _,ok :=r.curIndex[key];!ok{
return ""
if currentInt >= lens {
r.curIndex[serviceName] = 0
currentInt = 0
}
if r.curIndex[key] >= lens {
r.curIndex[key] = 0
}
curAddr := r.rss[serviceName][currentInt]
curAddr := r.rss[key][r.curIndex[key]]
r.curIndex[key] = (r.curIndex[key] + 1) % lens
r.curIndex[serviceName] = (currentInt + 1) % lens
return curAddr
}
......@@ -89,9 +93,8 @@ func (r *RoundRobinBalance) Get(key ...string) (string, error) {
更新负载均衡器重缓存服务的节点信息
*/
func (r *RoundRobinBalance) Update() {
r.lock.Lock()
defer r.lock.Unlock()
log.Print("[INFO] 负载均衡器 轮询模式 更新负载均衡配置...... ")
//log.Print("[INFO] 负载均衡器 轮询模式 更新负载均衡配置...... ")
//fmt.Println("更新负载均衡配置.....")
allServiceInfo := LoadBalanceConfig.GetLoadBalanceList()
if allServiceInfo == nil || allServiceInfo.ServiceMap == nil{
......
......@@ -3,7 +3,6 @@ package load_balance
import (
"errors"
"fmt"
"log"
"strconv"
"sync"
)
......@@ -55,6 +54,8 @@ func (r *WeightRoundRobinBalance) Add(serviceName string,params ...string) error
}
func (r *WeightRoundRobinBalance) Next(key string) string {
r.lock.Lock()
defer r.lock.Unlock()
total := 0
var best *WeightNode
if r.rss == nil {
......@@ -99,10 +100,10 @@ func (r *WeightRoundRobinBalance) Get(key ...string) (string, error) {
func (r *WeightRoundRobinBalance) Update() {
r.lock.Lock()
defer r.lock.Unlock()
//r.lock.Lock()
//defer r.lock.Unlock()
log.Print("[INFO] 负载均衡器 加权轮询模式 更新负载均衡配置...... ")
//log.Print("[INFO] 负载均衡器 加权轮询模式 更新负载均衡配置...... ")
allServiceInfo := LoadBalanceConfig.GetLoadBalanceList()
if allServiceInfo == nil || allServiceInfo.ServiceMap == nil{
return
......
......@@ -16,7 +16,7 @@ import (
const (
MaxServiceNum = 50
MaxSyncServiceInterval = time.Second * 60
MaxSyncServiceInterval = time.Second * 30
)
//etcd 注册插件
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment