Commit 87ebb210 by 孙龙

up

parent cda8cdd4
...@@ -30,7 +30,7 @@ var( ...@@ -30,7 +30,7 @@ var(
func Init(reg registry.Registry) *LoadBalanceEtcdConf{ func Init(reg registry.Registry) *LoadBalanceEtcdConf{
once.Do(func() { once.Do(func() {
LoadBalanceConfig = &LoadBalanceEtcdConf{} LoadBalanceConfig = &LoadBalanceEtcdConf{}
rb := LoadBanlanceFactory(LbRandom) rb := LoadBanlanceFactory(LbWeightRoundRobin)
//添加负载均衡器到配置中 //添加负载均衡器到配置中
LoadBalanceConfig.Attach(rb) LoadBalanceConfig.Attach(rb)
LoadBalanceConfig.registry = reg LoadBalanceConfig.registry = reg
...@@ -76,7 +76,7 @@ func (s *LoadBalanceEtcdConf) GetService(ctx context.Context,name string){ ...@@ -76,7 +76,7 @@ func (s *LoadBalanceEtcdConf) GetService(ctx context.Context,name string){
} }
node ,err :=s.GetLoadBalance().Get(name) node ,err :=s.GetLoadBalance().Get(name)
fmt.Println(node) fmt.Println(node,err)
//for _, node := range service.Nodes { //for _, node := range service.Nodes {
// fmt.Printf("服务名:%s, 节点::%#v\n", service.Name, node) // fmt.Printf("服务名:%s, 节点::%#v\n", service.Name, node)
//} //}
......
...@@ -2,8 +2,10 @@ package load_balance ...@@ -2,8 +2,10 @@ package load_balance
import ( import (
"errors" "errors"
"fmt"
"github.com/syyongx/php2go" "github.com/syyongx/php2go"
"hash/crc32" "hash/crc32"
"net"
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
...@@ -28,6 +30,7 @@ func (s UInt32Slice) Swap(i, j int) { ...@@ -28,6 +30,7 @@ func (s UInt32Slice) Swap(i, j int) {
type ConsistentHashBanlance struct { type ConsistentHashBanlance struct {
mux sync.RWMutex mux sync.RWMutex
hash Hash hash Hash
lock sync.Mutex
replicas int //复制因子 replicas int //复制因子
keys map[string]UInt32Slice //已排序的节点hash切片 keys map[string]UInt32Slice //已排序的节点hash切片
hashMap map[string]map[uint32]string //节点哈希和Key的map,键是hash值,值是节点key hashMap map[string]map[uint32]string //节点哈希和Key的map,键是hash值,值是节点key
...@@ -69,7 +72,6 @@ func (c *ConsistentHashBanlance) Add(serviceName string,params ...string) error ...@@ -69,7 +72,6 @@ func (c *ConsistentHashBanlance) Add(serviceName string,params ...string) error
for i := 0; i < c.replicas; i++ { for i := 0; i < c.replicas; i++ {
hash := c.hash([]byte(addr+strconv.Itoa(i))) hash := c.hash([]byte(addr+strconv.Itoa(i)))
//c.keys = append(c.keys, hash)
if php2go.InArray(hash,c.keys[serviceName]) == false{ if php2go.InArray(hash,c.keys[serviceName]) == false{
c.keys[serviceName] = append(c.keys[serviceName], hash) c.keys[serviceName] = append(c.keys[serviceName], hash)
} }
...@@ -80,37 +82,136 @@ func (c *ConsistentHashBanlance) Add(serviceName string,params ...string) error ...@@ -80,37 +82,136 @@ func (c *ConsistentHashBanlance) Add(serviceName string,params ...string) error
c.hashMap[serviceName][hash] = addr c.hashMap[serviceName][hash] = addr
} }
// 对所有虚拟节点的哈希值进行排序,方便之后进行二分查找 // 对所有虚拟节点的哈希值进行排序,方便之后进行二分查找
//sort.Sort(c.keys[serviceName]) sort.Sort(c.keys[serviceName])
//fmt.Println(c.keys)
//fmt.Println(c.hashMap)
return nil return nil
} }
// 获取本机网卡IP
func (c *ConsistentHashBanlance) getLocalIP() (ipv4 string, err error) {
var (
addrs []net.Addr
addr net.Addr
ipNet *net.IPNet // IP地址
isIpNet bool
)
// 获取所有网卡
if addrs, err = net.InterfaceAddrs(); err != nil {
return
}
// 取第一个非lo的网卡IP
for _, addr = range addrs {
// 这个网络地址是IP地址: ipv4, ipv6
if ipNet, isIpNet = addr.(*net.IPNet); isIpNet && !ipNet.IP.IsLoopback() {
// 跳过IPV6
if ipNet.IP.To4() != nil {
ipv4 = ipNet.IP.String() // 192.168.1.1
return
}
}
}
err = errors.New("没有找到网卡IP")
return
}
// Get 方法根据给定的对象获取最靠近它的那个节点 // Get 方法根据给定的对象获取最靠近它的那个节点
func (c *ConsistentHashBanlance) Get(key ...string) (string, error) { func (c *ConsistentHashBanlance) Get(key ...string) (string, error) {
if c.IsEmpty() { if c.IsEmpty() {
return "", errors.New("node is empty") return "", errors.New(" node is empty")
}
//hash := c.hash([]byte(key[1]))
localIP ,err := c.getLocalIP()
//fmt.Println(localIP)
if err != nil{
localIP = "127.0.0.1"
} }
hash := c.hash([]byte(key[1])) hash := c.hash([]byte(localIP))
if _,ok := c.keys[key[0]];!ok{ serviceName := key[0]
if _,ok := c.keys[serviceName];!ok{
return "", errors.New("node is empty") return "", errors.New("node is empty")
} }
// 通过二分查找获取最优节点,第一个"服务器hash"值大于"数据hash"值的就是最优"服务器节点" // 通过二分查找获取最优节点,第一个"服务器hash"值大于"数据hash"值的就是最优"服务器节点"
idx := sort.Search(len(c.keys[key[0]]), func(i int) bool { return c.keys[key[0]][i] >= hash }) idx := sort.Search(len(c.keys[serviceName]), func(i int) bool { return c.keys[serviceName][i] >= hash })
// 如果查找结果 大于 服务器节点哈希数组的最大索引,表示此时该对象哈希值位于最后一个节点之后,那么放入第一个节点中 // 如果查找结果 大于 服务器节点哈希数组的最大索引,表示此时该对象哈希值位于最后一个节点之后,那么放入第一个节点中
if idx == len(c.keys[key[0]]) { if idx == len(c.keys[serviceName]) {
idx = 0 idx = 0
} }
c.mux.RLock() c.mux.RLock()
defer c.mux.RUnlock() defer c.mux.RUnlock()
return c.hashMap[key[0]][c.keys[key[0]][idx]], nil return c.hashMap[serviceName][c.keys[serviceName][idx]], nil
} }
func (c *ConsistentHashBanlance) Update() { func (c *ConsistentHashBanlance) Update() {
c.lock.Lock()
defer c.lock.Unlock()
//fmt.Println("更新负载均衡配置.....")
allServiceInfo := LoadBalanceConfig.GetLoadBalanceList()
if allServiceInfo == nil || allServiceInfo.ServiceMap == nil{
return
}
for serviceName,_ := range c.hashMap{
if _,ok := allServiceInfo.ServiceMap[serviceName];!ok{
delete(c.hashMap,serviceName)
delete(c.keys,serviceName)
continue
}
if len(allServiceInfo.ServiceMap[serviceName].Nodes) == 0{
delete(c.hashMap,serviceName)
delete(c.keys,serviceName)
continue
}
//循环etcd中的节点信息
var (
tmpKeys UInt32Slice
tmpHashMap map[uint32]string
)
tmpKeys = make(UInt32Slice,0)
tmpHashMap = make(map[uint32]string,0)
for _,etcdServiceNode := range allServiceInfo.ServiceMap[serviceName].Nodes{
if etcdServiceNode.IP == ""{
continue
}
for i := 0; i < c.replicas; i++ {
var hash uint32
var addr string
if etcdServiceNode.Port == 0 {
addr = etcdServiceNode.IP
}else{
addr = fmt.Sprintf("%s:%s",etcdServiceNode.IP,strconv.Itoa(etcdServiceNode.Port))
}
hash = c.hash([]byte(addr+strconv.Itoa(i)))
tmpKeys = append(tmpKeys, hash)
tmpHashMap[hash] = addr
}
}
c.keys[serviceName] = tmpKeys
c.hashMap[serviceName] = tmpHashMap
sort.Sort(c.keys[serviceName])
}
for _,service := range allServiceInfo.ServiceMap{
serviceName := service.Name
nodes := service.Nodes
for _,node :=range nodes{
if node.IP == ""{
continue
}
if node.Port == 0{
c.Add(serviceName,node.IP)
}else{
c.Add(serviceName,fmt.Sprintf("%s:%s",node.IP,strconv.Itoa(node.Port)))
}
}
}
} }
...@@ -28,8 +28,12 @@ func (r *RandomBalance) Add(serviceName string,params ...string) error { ...@@ -28,8 +28,12 @@ func (r *RandomBalance) Add(serviceName string,params ...string) error {
} }
if php2go.InArray(addr,r.rss[serviceName]) == false{ if php2go.InArray(addr,r.rss[serviceName]) == false{
r.rss[serviceName] = append(r.rss[serviceName],addr) r.rss[serviceName] = append(r.rss[serviceName],addr)
r.curIndex[serviceName] = 0
} }
r.curIndex[serviceName] = 0 if r.curIndex[serviceName] > len(r.rss[serviceName]){
r.curIndex[serviceName] = 0
}
return nil return nil
} }
...@@ -58,7 +62,11 @@ func (r *RandomBalance) Next(key string) string { ...@@ -58,7 +62,11 @@ func (r *RandomBalance) Next(key string) string {
} }
func (r *RandomBalance) Get(key ...string) (string, error) { func (r *RandomBalance) Get(key ...string) (string, error) {
return r.Next(key[0]), nil node := r.Next(key[0])
if node == ""{
return "",errors.New("沒找到節點信息")
}
return node, nil
} }
...@@ -66,7 +74,7 @@ func (r *RandomBalance) Get(key ...string) (string, error) { ...@@ -66,7 +74,7 @@ func (r *RandomBalance) Get(key ...string) (string, error) {
func (r *RandomBalance) Update() { func (r *RandomBalance) Update() {
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
fmt.Println("更新负载均衡配置.....") //fmt.Println("更新负载均衡配置.....")
allServiceInfo := LoadBalanceConfig.GetLoadBalanceList() allServiceInfo := LoadBalanceConfig.GetLoadBalanceList()
if allServiceInfo == nil || allServiceInfo.ServiceMap == nil{ if allServiceInfo == nil || allServiceInfo.ServiceMap == nil{
return return
...@@ -92,6 +100,9 @@ func (r *RandomBalance) Update() { ...@@ -92,6 +100,9 @@ func (r *RandomBalance) Update() {
} }
} }
r.rss[serviceName] = tmpNodes r.rss[serviceName] = tmpNodes
if r.curIndex[serviceName] > len(r.rss[serviceName]){
r.curIndex[serviceName] = 0
}
} }
......
...@@ -35,6 +35,11 @@ func (r *RoundRobinBalance) Add(serviceName string,params ...string) error { ...@@ -35,6 +35,11 @@ func (r *RoundRobinBalance) Add(serviceName string,params ...string) error {
r.rss[serviceName] = append(r.rss[serviceName],addr) r.rss[serviceName] = append(r.rss[serviceName],addr)
r.curIndex[serviceName] = 0 r.curIndex[serviceName] = 0
} }
if r.curIndex[serviceName] > len(r.rss[serviceName]){
r.curIndex[serviceName] = 0
}
return nil return nil
} }
...@@ -72,7 +77,11 @@ func (r *RoundRobinBalance) Next(key string) string { ...@@ -72,7 +77,11 @@ func (r *RoundRobinBalance) Next(key string) string {
} }
func (r *RoundRobinBalance) Get(key ...string) (string, error) { func (r *RoundRobinBalance) Get(key ...string) (string, error) {
return r.Next(key[0]), nil node := r.Next(key[0])
if node == ""{
return "",errors.New("沒找到節點信息")
}
return node, nil
} }
/* /*
...@@ -82,7 +91,7 @@ func (r *RoundRobinBalance) Update() { ...@@ -82,7 +91,7 @@ func (r *RoundRobinBalance) Update() {
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
fmt.Println("更新负载均衡配置.....") //fmt.Println("更新负载均衡配置.....")
allServiceInfo := LoadBalanceConfig.GetLoadBalanceList() allServiceInfo := LoadBalanceConfig.GetLoadBalanceList()
if allServiceInfo == nil || allServiceInfo.ServiceMap == nil{ if allServiceInfo == nil || allServiceInfo.ServiceMap == nil{
return return
...@@ -108,6 +117,9 @@ func (r *RoundRobinBalance) Update() { ...@@ -108,6 +117,9 @@ func (r *RoundRobinBalance) Update() {
} }
} }
r.rss[serviceName] = tmpNodes r.rss[serviceName] = tmpNodes
if r.curIndex[serviceName] > len(r.rss[serviceName]){
r.curIndex[serviceName] = 0
}
} }
for _,service := range allServiceInfo.ServiceMap{ for _,service := range allServiceInfo.ServiceMap{
......
...@@ -2,11 +2,14 @@ package load_balance ...@@ -2,11 +2,14 @@ package load_balance
import ( import (
"errors" "errors"
"fmt"
"strconv" "strconv"
"sync"
) )
type WeightRoundRobinBalance struct { type WeightRoundRobinBalance struct {
rss map[string][]*WeightNode rss map[string][]*WeightNode
lock sync.Mutex
} }
type WeightNode struct { type WeightNode struct {
...@@ -86,18 +89,89 @@ func (r *WeightRoundRobinBalance) Next(key string) string { ...@@ -86,18 +89,89 @@ func (r *WeightRoundRobinBalance) Next(key string) string {
} }
func (r *WeightRoundRobinBalance) Get(key ...string) (string, error) { func (r *WeightRoundRobinBalance) Get(key ...string) (string, error) {
return r.Next(key[0]), nil node := r.Next(key[0])
if node == ""{
return "",errors.New("沒找到節點信息")
}
return node, nil
} }
func (r *WeightRoundRobinBalance) Update() { func (r *WeightRoundRobinBalance) Update() {
//list := Init().GetLoadBalanceList()
//fmt.Println("更新负载均衡配置..start...") r.lock.Lock()
//for _,serviceList := range list.ServiceMap{ defer r.lock.Unlock()
// for _,node:=range serviceList.Nodes{
// fmt.Printf("服务名%s,服务列表%s",serviceList.Name,node.IP+":"+strconv.Itoa(node.Port)) fmt.Println("更新负载均衡配置.....")
// r.Add(serviceList.Name,node.IP+":"+strconv.Itoa(node.Port),strconv.Itoa(node.Weight)) allServiceInfo := LoadBalanceConfig.GetLoadBalanceList()
// } if allServiceInfo == nil || allServiceInfo.ServiceMap == nil{
//} return
//fmt.Println("更新负载均衡配置..end...") }
//删除不存在的服务节点信息
for serviceName,_ := range r.rss{
if _,ok := allServiceInfo.ServiceMap[serviceName];!ok{
delete(r.rss,serviceName)
continue
}
//循环etcd中的节点信息
var tmpNodes []*WeightNode
tmpNodes = make([]*WeightNode,0)
oldTmpNodes := r.rss[serviceName]
for _,etcdServiceNode := range allServiceInfo.ServiceMap[serviceName].Nodes{
if etcdServiceNode.IP == ""{
continue
}
if etcdServiceNode.Port == 0 {
tmpWeightNode := &WeightNode{
addr:etcdServiceNode.IP,
weight:etcdServiceNode.Weight,
currentWeight:etcdServiceNode.Weight,
effectiveWeight:etcdServiceNode.Weight,
}
for _,v := range oldTmpNodes{
if v.addr == etcdServiceNode.IP{
tmpWeightNode.currentWeight = v.currentWeight
tmpWeightNode.effectiveWeight = v.effectiveWeight
}
}
tmpNodes = append(tmpNodes,tmpWeightNode)
}else{
addr := fmt.Sprintf("%s:%s",etcdServiceNode.IP,strconv.Itoa(etcdServiceNode.Port))
tmpWeightNode := &WeightNode{
addr:addr,
weight:etcdServiceNode.Weight,
currentWeight:etcdServiceNode.Weight,
effectiveWeight:etcdServiceNode.Weight,
}
for _,v := range oldTmpNodes{
if v.addr == tmpWeightNode.addr{
tmpWeightNode.currentWeight = v.currentWeight
tmpWeightNode.effectiveWeight = v.effectiveWeight
}
}
tmpNodes = append(tmpNodes,tmpWeightNode)
}
}
r.rss[serviceName] = tmpNodes
}
for _,service := range allServiceInfo.ServiceMap{
serviceName := service.Name
nodes := service.Nodes
for _,node :=range nodes{
if node.IP == ""{
continue
}
if node.Port == 0{
r.Add(serviceName,node.IP,strconv.Itoa(node.Weight))
}else{
r.Add(serviceName,fmt.Sprintf("%s:%s",node.IP,strconv.Itoa(node.Port)),strconv.Itoa(node.Weight))
}
}
}
} }
...@@ -48,6 +48,7 @@ var ( ...@@ -48,6 +48,7 @@ var (
serviceCh: make(chan *registry.Service, MaxServiceNum), serviceCh: make(chan *registry.Service, MaxServiceNum),
registryServiceMap: make(map[string]*RegisterService, MaxServiceNum), registryServiceMap: make(map[string]*RegisterService, MaxServiceNum),
} }
once sync.Once
) )
//导入包立即执行函数 //导入包立即执行函数
...@@ -292,6 +293,9 @@ func (e *EtcdRegistry) GetService(ctx context.Context, name string) (service *re ...@@ -292,6 +293,9 @@ func (e *EtcdRegistry) GetService(ctx context.Context, name string) (service *re
//更新当前需要获取的服务列表 //更新当前需要获取的服务列表
allServiceInfoNew.ServiceMap[name] = service allServiceInfoNew.ServiceMap[name] = service
e.value.Store(allServiceInfoNew) e.value.Store(allServiceInfoNew)
load_balance.LoadBalanceConfig.UpdateConf(allServiceInfoNew)
return return
} }
......
...@@ -31,11 +31,13 @@ func TestRegister(t *testing.T) { ...@@ -31,11 +31,13 @@ func TestRegister(t *testing.T) {
service.Nodes = append(service.Nodes, &registry.Node{ service.Nodes = append(service.Nodes, &registry.Node{
IP: "127.0.0.1", IP: "127.0.0.1",
Port: 8801, Port: 8801,
Weight:2,
}, },
&registry.Node{ //&registry.Node{
IP: "127.0.0.2", // IP: "127.0.0.2",
Port: 8801, // Port: 8801,
}, // Weight:1,
//},
) )
registryInst.Register(context.TODO(), service) registryInst.Register(context.TODO(), service)
...@@ -76,11 +78,13 @@ func TestRegister2(t *testing.T) { ...@@ -76,11 +78,13 @@ func TestRegister2(t *testing.T) {
service.Nodes = append(service.Nodes, &registry.Node{ service.Nodes = append(service.Nodes, &registry.Node{
IP: "127.0.0.3", IP: "127.0.0.3",
Port: 8801, Port: 8801,
Weight:1,
}, },
&registry.Node{ //&registry.Node{
IP: "127.0.0.4", // IP: "127.0.0.4",
Port: 8801, // Port: 8801,
}, // Weight:2,
//},
) )
registryInst.Register(context.TODO(), service) registryInst.Register(context.TODO(), service)
...@@ -110,10 +114,10 @@ func TestGetService(t *testing.T){ ...@@ -110,10 +114,10 @@ func TestGetService(t *testing.T){
for{ for{
go func() { go func() {
loadBalance.GetService(context.TODO(), "comment_service") loadBalance.GetService(context.TODO(), "comment_service")
loadBalance.GetService(context.TODO(), "comment_service22222") //loadBalance.GetService(context.TODO(), "comment_service22222")
}() }()
//time.Sleep(time.Second*1) time.Sleep(time.Second*1)
time.Sleep(time.Millisecond*10) //time.Sleep(time.Millisecond*100)
} }
} }
...@@ -3,6 +3,7 @@ package main ...@@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/ichunt2019/ichunt-micro-service/proxy/load_balance"
"github.com/ichunt2019/ichunt-micro-service/registry" "github.com/ichunt2019/ichunt-micro-service/registry"
_ "github.com/ichunt2019/ichunt-micro-service/registry/etcd" _ "github.com/ichunt2019/ichunt-micro-service/registry/etcd"
"io" "io"
...@@ -15,10 +16,10 @@ import ( ...@@ -15,10 +16,10 @@ import (
) )
func main() { func main() {
rs1 := &RealServer{Addr: "127.0.0.1:2003"} rs1 := &RealServer{Addr: "192.168.2.232:2003"}
rs1.Run() rs1.Run()
rs2 := &RealServer{Addr: "127.0.0.1:2004"} rs2 := &RealServer{Addr: "192.168.2.232:2004"}
rs2.Run() rs2.Run()
//服务注册 //服务注册
...@@ -43,22 +44,27 @@ func register(){ ...@@ -43,22 +44,27 @@ func register(){
registry.WithRegistryPath("/ichuntMicroService/"), registry.WithRegistryPath("/ichuntMicroService/"),
registry.WithHeartBeat(5), registry.WithHeartBeat(5),
) )
load_balance.Init(registryInst)
if err != nil { if err != nil {
fmt.Printf("init registry failed, err:%v", err) fmt.Printf("init registry failed, err:%v", err)
return return
} }
service := &registry.Service{ service := &registry.Service{
Name: "comment_service", Name: "ichuntMicroService",
} }
service.Nodes = append(service.Nodes, &registry.Node{ service.Nodes = append(service.Nodes, &registry.Node{
IP: "127.0.0.1", IP: "192.168.2.232",
Port: 2003, Port: 2003,
Weight:1,
}, },
&registry.Node{ &registry.Node{
IP: "127.0.0.1", IP: "192.168.2.232",
Port: 2004, Port: 2004,
Weight:2,
}, },
) )
registryInst.Register(context.TODO(), service) registryInst.Register(context.TODO(), service)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment