Commit 737b35b0 by 孙龙

up

parents
module github.com/ichunt2019/ichunt-micro-service
go 1.14
require (
github.com/coreos/etcd v3.3.25+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/protobuf v1.4.2 // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/imroc/req v0.3.0
github.com/syyongx/php2go v0.9.4
go.uber.org/zap v1.16.0 // indirect
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d // indirect
google.golang.org/grpc v1.31.1 // indirect
)
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
This diff is collapsed. Click to expand it.
package load_balance
import (
"context"
"fmt"
"github.com/ichunt2019/ichunt-micro-service/registry"
_ "github.com/ichunt2019/ichunt-micro-service/registry"
"sync"
"sync/atomic"
)
type Observer interface {
Update()
}
type LoadBalanceEtcdConf struct {
observers []Observer
value atomic.Value //缓存已经注册的服务节点信息
registry registry.Registry
}
var(
once sync.Once
//实例化对象
LoadBalanceConfig *LoadBalanceEtcdConf
)
func Init(reg registry.Registry) *LoadBalanceEtcdConf{
once.Do(func() {
LoadBalanceConfig = &LoadBalanceEtcdConf{}
rb := LoadBanlanceFactory(LbRoundRobin)
//添加负载均衡器到配置中
LoadBalanceConfig.Attach(rb)
LoadBalanceConfig.registry = reg
allServiceInfo := &registry.AllServiceInfo{
ServiceMap: make(map[string]*registry.Service),
}
//atomic.Value 原子操作 为了防止并发
LoadBalanceConfig.value.Store(allServiceInfo)
})
return LoadBalanceConfig
}
func (s *LoadBalanceEtcdConf) Attach(o Observer) {
s.observers = append(s.observers, o)
}
//更新配置时,通知监听者也更新
func (s *LoadBalanceEtcdConf) UpdateConf(serviceInfo *registry.AllServiceInfo) {
s.value.Store(serviceInfo)
for _, obs := range s.observers {
obs.Update()
}
}
func (s *LoadBalanceEtcdConf) GetLoadBalanceList() *registry.AllServiceInfo{
return s.value.Load().(*registry.AllServiceInfo)
}
func (s *LoadBalanceEtcdConf) GetLoadBalance() LoadBalance{
return s.observers[0].(LoadBalance)
}
func (s *LoadBalanceEtcdConf) GetRegistry() registry.Registry{
return s.registry
}
func (s *LoadBalanceEtcdConf) GetService(ctx context.Context,name string){
regService := s.GetRegistry()
_, err := regService.GetService(context.TODO(),name)
if err != nil {
fmt.Printf("get service failed, err:%v", err)
return
}
node ,err :=s.GetLoadBalance().Get(name)
fmt.Println(node)
//for _, node := range service.Nodes {
// fmt.Printf("服务名:%s, 节点::%#v\n", service.Name, node)
//}
}
package load_balance
import (
"errors"
"github.com/syyongx/php2go"
"hash/crc32"
"sort"
"strconv"
"sync"
)
type Hash func(data []byte) uint32
type UInt32Slice []uint32
func (s UInt32Slice) Len() int {
return len(s)
}
func (s UInt32Slice) Less(i, j int) bool {
return s[i] < s[j]
}
func (s UInt32Slice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
type ConsistentHashBanlance struct {
mux sync.RWMutex
hash Hash
replicas int //复制因子
keys map[string]UInt32Slice //已排序的节点hash切片
hashMap map[string]map[uint32]string //节点哈希和Key的map,键是hash值,值是节点key
}
func NewConsistentHashBanlance(replicas int, fn Hash) *ConsistentHashBanlance {
m := &ConsistentHashBanlance{
replicas: replicas,
hash: fn,
hashMap: make(map[string]map[uint32]string),
}
if m.hash == nil {
//最多32位,保证是一个2^32-1环
m.hash = crc32.ChecksumIEEE
}
return m
}
// 验证是否为空
func (c *ConsistentHashBanlance) IsEmpty() bool {
return len(c.keys) == 0
}
// Add 方法用来添加缓存节点,参数为节点key,比如使用IP
func (c *ConsistentHashBanlance) Add(serviceName string,params ...string) error {
if len(params) == 0 {
return errors.New("param len 1 at least")
}
addr := params[0]
c.mux.Lock()
defer c.mux.Unlock()
if c.keys == nil{
c.keys = make(map[string]UInt32Slice)
}
// 结合复制因子计算所有虚拟节点的hash值,并存入m.keys中,同时在m.hashMap中保存哈希值和key的映射
for i := 0; i < c.replicas; i++ {
hash := c.hash([]byte(addr+strconv.Itoa(i)))
//c.keys = append(c.keys, hash)
if php2go.InArray(hash,c.keys[serviceName]) == false{
c.keys[serviceName] = append(c.keys[serviceName], hash)
}
if c.hashMap[serviceName] == nil{
c.hashMap[serviceName] = make(map[uint32]string)
c.hashMap[serviceName] = map[uint32]string{hash:addr}
}
c.hashMap[serviceName][hash] = addr
}
// 对所有虚拟节点的哈希值进行排序,方便之后进行二分查找
//sort.Sort(c.keys[serviceName])
//fmt.Println(c.keys)
//fmt.Println(c.hashMap)
return nil
}
// Get 方法根据给定的对象获取最靠近它的那个节点
func (c *ConsistentHashBanlance) Get(key ...string) (string, error) {
if c.IsEmpty() {
return "", errors.New("node is empty")
}
hash := c.hash([]byte(key[1]))
if _,ok := c.keys[key[0]];!ok{
return "", errors.New("node is empty")
}
// 通过二分查找获取最优节点,第一个"服务器hash"值大于"数据hash"值的就是最优"服务器节点"
idx := sort.Search(len(c.keys[key[0]]), func(i int) bool { return c.keys[key[0]][i] >= hash })
// 如果查找结果 大于 服务器节点哈希数组的最大索引,表示此时该对象哈希值位于最后一个节点之后,那么放入第一个节点中
if idx == len(c.keys[key[0]]) {
idx = 0
}
c.mux.RLock()
defer c.mux.RUnlock()
return c.hashMap[key[0]][c.keys[key[0]][idx]], nil
}
func (c *ConsistentHashBanlance) Update() {
}
package load_balance
import (
"fmt"
"testing"
)
func TestNewConsistentHashBanlance(t *testing.T) {
rb := NewConsistentHashBanlance(50, nil)
rb.Add("ichunt/abc","127.0.0.1:2001") //0
rb.Add("ichunt/abc","127.0.0.1:2001") //0
rb.Add("ichunt/abc","127.0.0.1:2001") //0
rb.Add("ichunt/abc","172.156.26.3:2002") //1
rb.Add("ichunt/abc","172.156.26.3:2002") //1
rb.Add("ichunt/abc","172.156.26.3:2002") //1
rb.Add("ichunt/abc","127.0.0.2:2003") //2
rb.Add("ichunt/abc","127.0.0.2:2003") //2
//rb.Add("ichunt/abc","127.0.0.1:2006") //2
//rb.Add("ichunt/abc","127.0.0.1:2007") //2
//url hash
//fmt.Println(rb.Get("http://127.0.0.1:2002/base/getinfo"))
//fmt.Println(rb.Get("http://127.0.0.1:2002/base/getinfo"))
//fmt.Println(rb.Get("http://127.0.0.1:2002/base/getinfo"))
//fmt.Println(rb.Get("http://127.0.0.1:2002/base/changepwd"))
//
////ip hash
fmt.Println(rb.Get("ichunt/abc","http://127.0.0.1:2002/base/getinfo"))
fmt.Println(rb.Get("ichunt/abc","http://127.0.0.1:2002/base/changepwd"))
fmt.Println(rb.Get("ichunt/abc","http://127.0.0.1:2002/base/getinfo"))
fmt.Println(rb.Get("ichunt/abc","http://127.0.0.1:2002/base/changepwd"))
fmt.Println(rb.Get("ichunt/abc","172.156.26.3:2002ggg/abase/agetinfo555"))
fmt.Println(rb.Get("ichunt/abc","172.156.26.3:2002fsdggg/abase/agetinfo555"))
}
\ No newline at end of file
package load_balance
type LbType int
const (
LbRandom LbType = iota
LbRoundRobin
LbWeightRoundRobin
LbConsistentHash
)
func LoadBanlanceFactory(lbType LbType) LoadBalance {
switch lbType {
case LbRandom:
return &RandomBalance{}
case LbConsistentHash:
return NewConsistentHashBanlance(50, nil)
case LbRoundRobin:
return &RoundRobinBalance{}
case LbWeightRoundRobin:
return &WeightRoundRobinBalance{}
default:
return &RandomBalance{}
}
}
package load_balance
type LoadBalance interface {
Add(string,...string) error
Get(...string) (string, error)
//后期服务发现补充
Update()
}
package load_balance
import (
"errors"
"github.com/syyongx/php2go"
"math/rand"
)
type RandomBalance struct {
curIndex map[string]int
rss map[string][]string
}
func (r *RandomBalance) Add(serviceName string,params ...string) error {
if len(params) == 0 {
return errors.New("param len 1 at least")
}
addr := params[0]
if (r.rss == nil){
r.rss = make(map[string][]string,0)
}
if (r.curIndex == nil){
r.curIndex = make(map[string]int,0)
}
if php2go.InArray(addr,r.rss[serviceName]) == false{
r.rss[serviceName] = append(r.rss[serviceName],addr)
}
r.curIndex[serviceName] = 0
return nil
}
func (r *RandomBalance) Next(key string) string {
if r.rss == nil || len(r.rss) == 0 {
return ""
}
if (r.curIndex == nil){
r.curIndex = make(map[string]int,0)
r.curIndex[key] = 0
}
serviceList,ok := r.rss[key]
if !ok{
return ""
}
r.curIndex[key] = rand.Intn(len(serviceList))
return serviceList[r.curIndex[key]]
}
func (r *RandomBalance) Get(key ...string) (string, error) {
return r.Next(key[0]), nil
}
func (r *RandomBalance) Update() {
}
package load_balance
import (
"fmt"
"testing"
"time"
)
func TestRandomBalance(t *testing.T) {
rb := &RandomBalance{}
rb.Add("ichunt/abc","127.0.0.1:2003") //0
rb.Add("ichunt/abc","127.0.0.1:2003") //0
rb.Add("ichunt/abc","127.0.0.1:2003") //0
rb.Add("ichunt/abc","127.0.0.1:2003") //0
rb.Add("ichunt/abc","127.0.0.1:2004") //1
rb.Add("ichunt/abc","127.0.0.1:2004") //1
rb.Add("ichunt/abc","127.0.0.1:2005") //2
rb.Add("ichunt/abc","127.0.0.1:2005") //2
rb.Add("ichunt/abc","127.0.0.1:2005") //2
//
//
//rb.Add("ichunt/123","127.0.0.1:3003") //0
//rb.Add("ichunt/123","127.0.0.1:3004") //1
//rb.Add("ichunt/123","127.0.0.1:3005") //2
//
//rb.Add("ichunt/999","127.0.0.1:4003") //0
//rb.Add("ichunt/999","127.0.0.1:4004") //1
//rb.Add("ichunt/999","127.0.0.1:4005") //2
for{
fmt.Println(rb.Next("ichunt/abc"))
//fmt.Println(rb.Next("ichunt/123"))
//fmt.Println(rb.Next("ichunt/999"))
time.Sleep(time.Second*1)
}
}
package load_balance
import (
"errors"
"fmt"
"sync"
//_ "github.com/ichunt2019/ichunt-micro-service/registry"
"github.com/syyongx/php2go"
"strconv"
)
type RoundRobinBalance struct {
curIndex map[string]int
rss map[string][]string
lock sync.Mutex
}
func (r *RoundRobinBalance) Add(serviceName string,params ...string) error {
if len(params) == 0 {
return errors.New("param len 1 at least")
}
addr := params[0]
if (r.rss == nil){
r.rss = make(map[string][]string,0)
}
if (r.curIndex == nil){
r.curIndex = make(map[string]int,0)
}
if php2go.InArray(addr,r.rss[serviceName]) == false{
r.rss[serviceName] = append(r.rss[serviceName],addr)
r.curIndex[serviceName] = 0
}
return nil
}
func (r *RoundRobinBalance) Next(key string) string {
defer func() {
if r := recover(); r != nil {
fmt.Println("````````````````````")
return
}
}()
if r.rss == nil || len(r.rss) == 0 {
return ""
}
if _,ok := r.rss[key]; !ok{
return ""
}
lens := len(r.rss[key]) //节点的个数
if (r.curIndex == nil){
r.curIndex = make(map[string]int,0)
r.curIndex[key] = 0
}
if _,ok :=r.curIndex[key];!ok{
return ""
}
if r.curIndex[key] >= lens {
r.curIndex[key] = 0
}
php2go.array_
curAddr := r.rss[key][r.curIndex[key]]
r.curIndex[key] = (r.curIndex[key] + 1) % lens
return curAddr
}
func (r *RoundRobinBalance) Get(key ...string) (string, error) {
return r.Next(key[0]), nil
}
/*
更新负载均衡器重缓存服务的节点信息
*/
func (r *RoundRobinBalance) Update() {
r.lock.Lock()
defer r.lock.Unlock()
fmt.Println("更新负载均衡配置.....")
allServiceInfo := LoadBalanceConfig.GetLoadBalanceList()
if allServiceInfo == nil || allServiceInfo.ServiceMap == nil{
return
}
//删除不存在的服务节点信息
for serviceName,_ := range r.rss{
if _,ok := allServiceInfo.ServiceMap[serviceName];!ok{
delete(r.rss,serviceName)
continue
}
//循环etcd中的节点信息
var tmpNodes []string
for _,etcdServiceNode := range allServiceInfo.ServiceMap[serviceName].Nodes{
if etcdServiceNode.IP == ""{
continue
}
if etcdServiceNode.Port == 0 {
tmpNodes = append(tmpNodes,etcdServiceNode.IP)
}else{
tmpNodes = append(tmpNodes,fmt.Sprintf("%s:%s",etcdServiceNode.IP,strconv.Itoa(etcdServiceNode.Port)))
}
}
r.rss[serviceName] = tmpNodes
}
for _,service := range allServiceInfo.ServiceMap{
serviceName := service.Name
nodes := service.Nodes
for _,node :=range nodes{
if node.IP == ""{
continue
}
if node.Port == 0{
r.Add(serviceName,node.IP)
}else{
r.Add(serviceName,fmt.Sprintf("%s:%s",node.IP,strconv.Itoa(node.Port)))
}
}
}
}
package load_balance
import (
"fmt"
"testing"
"time"
)
func Test_main(t *testing.T) {
rb := &RoundRobinBalance{}
rb.Add("ichunt/abc","127.0.0.1:2003") //0
rb.Add("ichunt/abc","127.0.0.1:2003") //0
rb.Add("ichunt/abc","127.0.0.1:2003") //0
rb.Add("ichunt/abc","127.0.0.1:2004") //1
rb.Add("ichunt/abc","127.0.0.1:2004") //1
rb.Add("ichunt/abc","127.0.0.1:2005") //2
rb.Add("ichunt/abc","127.0.0.1:2005") //2
//rb.Add("ichunt/123","127.0.0.1:3003") //0
//rb.Add("ichunt/123","127.0.0.1:3004") //1
//rb.Add("ichunt/123","127.0.0.1:3005") //2
//
//rb.Add("ichunt/999","127.0.0.1:4003") //0
//rb.Add("ichunt/999","127.0.0.1:4004") //1
//rb.Add("ichunt/999","127.0.0.1:4005") //2
for{
fmt.Println(rb.Next("ichunt/abc"))
//fmt.Println(rb.Next("ichunt/123"))
//fmt.Println(rb.Next("ichunt/999"))
time.Sleep(time.Second*1)
}
}
package load_balance
import (
"errors"
"strconv"
)
type WeightRoundRobinBalance struct {
rss map[string][]*WeightNode
}
type WeightNode struct {
addr string
weight int //权重值
currentWeight int //节点当前权重
effectiveWeight int //有效权重
}
func (r *WeightRoundRobinBalance) Add(serviceName string,params ...string) error {
var (
isInsert bool
)
if (r.rss == nil){
r.rss = make(map[string][]*WeightNode,0)
}
if len(params) != 2 {
return errors.New("param len need 2")
}
weight, err := strconv.ParseInt(params[1], 10, 64)
if err != nil {
return err
}
node := &WeightNode{addr: params[0], weight: int(weight)}
node.effectiveWeight = node.weight
isInsert = true
if _,ok := r.rss[serviceName];ok{
for k,v:=range r.rss[serviceName]{
if v.addr == params[0]{
isInsert = false
r.rss[serviceName][k].weight = int(weight)
r.rss[serviceName][k].effectiveWeight = int(weight)
break
}
}
}
if isInsert{
r.rss[serviceName] = append(r.rss[serviceName],node)
}
return nil
}
func (r *WeightRoundRobinBalance) Next(key string) string {
total := 0
var best *WeightNode
if r.rss == nil {
return ""
}
if _,ok:=r.rss[key];!ok{
return ""
}
for i := 0; i < len(r.rss[key]); i++ {
w := r.rss[key][i]
//step 1 统计所有有效权重之和
total += w.effectiveWeight
//step 2 变更节点临时权重为的节点临时权重+节点有效权重
w.currentWeight += w.effectiveWeight
//step 3 有效权重默认与权重相同,通讯异常时-1, 通讯成功+1,直到恢复到weight大小
if w.effectiveWeight < w.weight {
w.effectiveWeight++
}
//step 4 选择最大临时权重点节点
if best == nil || w.currentWeight > best.currentWeight {
best = w
}
}
if best == nil {
return ""
}
//step 5 变更临时权重为 临时权重-有效权重之和
best.currentWeight -= total
return best.addr
}
func (r *WeightRoundRobinBalance) Get(key ...string) (string, error) {
return r.Next(key[0]), nil
}
func (r *WeightRoundRobinBalance) Update() {
//list := Init().GetLoadBalanceList()
//fmt.Println("更新负载均衡配置..start...")
//for _,serviceList := range list.ServiceMap{
// for _,node:=range serviceList.Nodes{
// fmt.Printf("服务名%s,服务列表%s",serviceList.Name,node.IP+":"+strconv.Itoa(node.Port))
// r.Add(serviceList.Name,node.IP+":"+strconv.Itoa(node.Port),strconv.Itoa(node.Weight))
// }
//}
//fmt.Println("更新负载均衡配置..end...")
}
package load_balance
import (
"fmt"
"testing"
"time"
)
func TestLB(t *testing.T) {
rb := &WeightRoundRobinBalance{}
rb.Add("ichunt/abc","127.0.0.1:2003","3") //0
rb.Add("ichunt/abc","127.0.0.1:2004","1") //1
rb.Add("ichunt/abc","127.0.0.1:2005","2") //2
rb.Add("ichunt/123","127.0.0.1:3003","4") //0
rb.Add("ichunt/123","127.0.0.1:3004","3") //1
rb.Add("ichunt/123","127.0.0.1:3005","2") //2
//
//rb.Add("ichunt/999","127.0.0.1:4003","4") //0
//rb.Add("ichunt/999","127.0.0.1:4004","3") //1
//rb.Add("ichunt/999","127.0.0.1:4005","2") //2
for{
//fmt.Println(rb.Get("ichunt/abc"))
rb.Get("ichunt/abc")
fmt.Println(rb.Next("ichunt/123"))
//fmt.Println(rb.Next("ichunt/999"))
time.Sleep(time.Second*1)
}
}
package etcd
import (
"context"
"encoding/json"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/ichunt2019/ichunt-micro-service/proxy/load_balance"
"github.com/ichunt2019/ichunt-micro-service/registry"
"path"
"sync"
"sync/atomic"
"time"
)
const (
MaxServiceNum = 8
MaxSyncServiceInterval = time.Second * 5
)
//etcd 注册插件
type EtcdRegistry struct {
options *registry.Options//etcd配置信息
client *clientv3.Client
serviceCh chan *registry.Service//服务注册 -》 服务信息 节点信息 servicename node
value atomic.Value //缓存已经注册的服务节点信息
lock sync.Mutex
registryServiceMap map[string]*RegisterService//需要注册到etcd中的服务节点信息
}
//存放所有服务信息 存入atomic.Value 为了防止并发
//type AllServiceInfo struct {
// serviceMap map[string]*registry.Service //节点信息 servicename node
//}
type RegisterService struct {
id clientv3.LeaseID
service *registry.Service //节点信息 servicename node
registered bool
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse //etcd返回的续租的信息 false表示续租失败 续租应答
}
var (
//实例化etcd服务
etcdRegistry *EtcdRegistry = &EtcdRegistry{
serviceCh: make(chan *registry.Service, MaxServiceNum),
registryServiceMap: make(map[string]*RegisterService, MaxServiceNum),
}
)
//导入包立即执行函数
func init() {
allServiceInfo := &registry.AllServiceInfo{
ServiceMap: make(map[string]*registry.Service, MaxServiceNum),
}
//atomic.Value 原子操作 为了防止并发
etcdRegistry.value.Store(allServiceInfo)
//注册etcd插件
//map["etcd"] = etcdRegistry
registry.RegisterPlugin(etcdRegistry)
go etcdRegistry.run()
}
//插件的名字
func (e *EtcdRegistry) Name() string {
return "etcd"
}
//初始化
func (e *EtcdRegistry) Init(ctx context.Context, opts ...registry.Option) (err error) {
e.options = &registry.Options{}
for _, opt := range opts {
opt(e.options)
}
e.client, err = clientv3.New(clientv3.Config{
Endpoints: e.options.Addrs,
DialTimeout: e.options.Timeout,
})
if err != nil {
err = fmt.Errorf("init etcd failed, err:%v", err)
return
}
return
}
//服务注册 把服务名称 节点信息放入通道serviceCh
func (e *EtcdRegistry) Register(ctx context.Context, service *registry.Service) (err error) {
select {
case e.serviceCh <- service:
default:
err = fmt.Errorf("register chan is full")
return
}
return
}
//服务反注册
func (e *EtcdRegistry) Unregister(ctx context.Context, service *registry.Service) (err error) {
return
}
func (e *EtcdRegistry) run() {
ticker := time.NewTicker(MaxSyncServiceInterval)
for {
select {
//读取注册进来的节点信息(服务名 ip+端口)
//手动加载过来的服务
case service := <-e.serviceCh:
//读取已经注册的服务
registryService, ok := e.registryServiceMap[service.Name]
if ok {
//更新节点信息
for _, node := range service.Nodes {
registryService.service.Nodes = append(registryService.service.Nodes, node)
}
registryService.registered = false
break
}
//插入节点信息
registryService = &RegisterService{
service: service,
}
//需要注册到etcd中的服务信息
e.registryServiceMap[service.Name] = registryService
case <-ticker.C:
//定时(10秒钟) 更新缓存中的服务信息
//e.value = AllServiceInfo
e.syncServiceFromEtcd()
default:
//注册服务 并 续约(把手动传过的需要注册的服务 put到etcd中 续租 )
//续租应答
e.registerOrKeepAlive()
time.Sleep(time.Millisecond * 500)
}
}
}
/*
服务注册
服务续租
*/
func (e *EtcdRegistry) registerOrKeepAlive() {
//循环注册的节点信息
for _, registryService := range e.registryServiceMap {
//如果是存活的节点 节点续期
if registryService.registered {
//处理续租应答 如果续租失败 registered = false
e.keepAlive(registryService)
continue
}
//服务注册 并永久续约
//设置registered = true
e.registerService(registryService)
}
}
/*
处理续租应答的协程
如果续租失败 registered = false
*/
func (e *EtcdRegistry) keepAlive(registryService *RegisterService) {
select {
case resp := <-registryService.keepAliveCh:
if resp == nil {
//租约失效 去除节点
registryService.registered = false
return
}
}
return
}
func (e *EtcdRegistry) registerService(registryService *RegisterService) (err error) {
resp, err := e.client.Grant(context.TODO(), e.options.HeartBeat)
if err != nil {
return
}
registryService.id = resp.ID
for _, node := range registryService.service.Nodes {
tmp := &registry.Service{
Name: registryService.service.Name,
Nodes: []*registry.Node{
node,
},
}
data, err := json.Marshal(tmp)
if err != nil {
continue
}
key := e.serviceNodePath(tmp)
fmt.Printf("register key:%s\n", key)
_, err = e.client.Put(context.TODO(), key, string(data), clientv3.WithLease(resp.ID))
if err != nil {
continue
}
// 自动续租
//<-ch <-ch==nil 租约失效
ch, err := e.client.KeepAlive(context.TODO(), resp.ID)
if err != nil {
continue
}
registryService.keepAliveCh = ch //续租
registryService.registered = true
}
return
}
func (e *EtcdRegistry) serviceNodePath(service *registry.Service) string {
nodeIP := fmt.Sprintf("%s:%d", service.Nodes[0].IP, service.Nodes[0].Port)
return path.Join(e.options.RegistryPath, service.Name, nodeIP)
}
func (e *EtcdRegistry) servicePath(name string) string {
return path.Join(e.options.RegistryPath, name)
}
func (e *EtcdRegistry) getServiceFromCache(ctx context.Context,
name string) (service *registry.Service, ok bool) {
allServiceInfo := e.value.Load().(*registry.AllServiceInfo)
//一般情况下,都会从缓存中读取
service, ok = allServiceInfo.ServiceMap[name]
return
}
func (e *EtcdRegistry) GetService(ctx context.Context, name string) (service *registry.Service, err error) {
//如果缓存中没有这个service,则从etcd中读取
e.lock.Lock()
defer e.lock.Unlock()
service, ok := e.getServiceFromCache(ctx, name)
if ok {
//fmt.Println("从缓存中获取...")
return
}
//从etcd中读取指定服务名字的服务信息
key := e.servicePath(name)
resp, err := e.client.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
return
}
service = &registry.Service{
Name: name,
}
for _, kv := range resp.Kvs {
value := kv.Value
var tmpService registry.Service
err = json.Unmarshal(value, &tmpService)
if err != nil {
return
}
for _, node := range tmpService.Nodes {
if tmpService.Name != name{
continue
}
service.Nodes = append(service.Nodes, node)
}
}
//从内存中读取所有的服务列表
allServiceInfoOld := e.value.Load().(*registry.AllServiceInfo)
//声明新的缓存服务
var allServiceInfoNew = &registry.AllServiceInfo{
ServiceMap: make(map[string]*registry.Service, MaxServiceNum),
}
//copy旧的服务列表信息到新的变量中
for key, val := range allServiceInfoOld.ServiceMap {
allServiceInfoNew.ServiceMap[key] = val
}
//更新当前需要获取的服务列表
allServiceInfoNew.ServiceMap[name] = service
e.value.Store(allServiceInfoNew)
return
}
/*
读取缓存从etcd中拉取服务节点信息 并更新
*/
func (e *EtcdRegistry) syncServiceFromEtcd() {
//fmt.Println("更新缓存的服务列表...")
var allServiceInfoNew = &registry.AllServiceInfo{
ServiceMap: make(map[string]*registry.Service, MaxServiceNum),
}
ctx := context.TODO()
allServiceInfo := e.value.Load().(*registry.AllServiceInfo)
//对于缓存的每一个服务,都需要从etcd中进行更新
for _, service := range allServiceInfo.ServiceMap {
key := e.servicePath(service.Name)
resp, err := e.client.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
allServiceInfoNew.ServiceMap[service.Name] = service
continue
}
serviceNew := &registry.Service{
Name: service.Name,
}
for _, kv := range resp.Kvs {
value := kv.Value
var tmpService registry.Service
err = json.Unmarshal(value, &tmpService)
if err != nil {
fmt.Printf("unmarshal failed, err:%v value:%s", err, string(value))
return
}
if tmpService.Name != service.Name{
continue
}
for _, node := range tmpService.Nodes {
serviceNew.Nodes = append(serviceNew.Nodes, node)
}
}
allServiceInfoNew.ServiceMap[serviceNew.Name] = serviceNew
}
e.value.Store(allServiceInfoNew)
load_balance.LoadBalanceConfig.UpdateConf(allServiceInfoNew)
}
package etcd
import (
"context"
"fmt"
"testing"
"time"
"github.com/ichunt2019/ichunt-micro-service/registry"
"github.com/ichunt2019/ichunt-micro-service/proxy/load_balance"
)
func TestRegister(t *testing.T) {
//初始化注册中心 注册etcd 服务中心
registryInst, err := registry.InitRegistry(context.TODO(), "etcd",
registry.WithAddrs([]string{"192.168.2.232:2379"}),
registry.WithTimeout(time.Second),
registry.WithRegistryPath("/ichuntMicroService/"),
registry.WithHeartBeat(5),
)
if err != nil {
t.Errorf("init registry failed, err:%v", err)
return
}
load_balance.Init(registryInst)
service := &registry.Service{
Name: "comment_service",
}
service.Nodes = append(service.Nodes, &registry.Node{
IP: "127.0.0.1",
Port: 8801,
},
&registry.Node{
IP: "127.0.0.2",
Port: 8801,
},
)
registryInst.Register(context.TODO(), service)
//for{
// go func() {
// loadBalance.GetService(context.TODO(), "comment_service")
// //loadBalance.GetService(context.TODO(), "comment_service22222")
// }()
// time.Sleep(time.Millisecond*5)
//}
forerver := make(chan struct{})
<-forerver
}
func TestRegister2(t *testing.T) {
//初始化注册中心 注册etcd 服务中心
registryInst, err := registry.InitRegistry(context.TODO(), "etcd",
registry.WithAddrs([]string{"192.168.2.232:2379"}),
registry.WithTimeout(time.Second),
registry.WithRegistryPath("/ichuntMicroService/"),
registry.WithHeartBeat(5),
)
if err != nil {
t.Errorf("init registry failed, err:%v", err)
return
}
load_balance.Init(registryInst)
service := &registry.Service{
Name: "comment_service",
}
service.Nodes = append(service.Nodes, &registry.Node{
IP: "127.0.0.3",
Port: 8801,
},
&registry.Node{
IP: "127.0.0.4",
Port: 8801,
},
)
registryInst.Register(context.TODO(), service)
forerver := make(chan struct{})
<-forerver
}
func TestGetService(t *testing.T){
//初始化注册中心 注册etcd 服务中心
registryInst, err := registry.InitRegistry(context.TODO(), "etcd",
registry.WithAddrs([]string{"192.168.2.232:2379"}),
registry.WithTimeout(time.Second),
registry.WithRegistryPath("/ichuntMicroService/"),
registry.WithHeartBeat(5),
)
if err != nil {
fmt.Printf("init registry failed, err:%v", err)
return
}
loadBalance := load_balance.Init(registryInst)
for{
go func() {
loadBalance.GetService(context.TODO(), "comment_service")
loadBalance.GetService(context.TODO(), "comment_service22222")
}()
//time.Sleep(time.Second*1)
time.Sleep(time.Millisecond*10)
}
}
package registry
import (
"time"
)
type Options struct {
Addrs []string
Timeout time.Duration
// example: /xxx_company/app/kuaishou/service_A/10.192.1.1:8801
// example: /xxx_company/app/kuaishou/service_A/10.192.1.2:8801
RegistryPath string
HeartBeat int64
}
type Option func(opts *Options)
func WithTimeout(timeout time.Duration) Option {
return func(opts *Options) {
opts.Timeout = timeout
}
}
func WithAddrs(addrs []string) Option {
return func(opts *Options) {
opts.Addrs = addrs
}
}
func WithRegistryPath(path string) Option {
return func(opts *Options) {
opts.RegistryPath = path
}
}
func WithHeartBeat(heartHeat int64) Option {
return func(opts *Options) {
opts.HeartBeat = heartHeat
}
}
package registry
import (
"context"
"fmt"
"sync"
)
var (
pluginMgr = &PluginMgr{
plugins: make(map[string]Registry),
}
)
type PluginMgr struct {
plugins map[string]Registry
lock sync.Mutex
}
func (p *PluginMgr) registerPlugin(plugin Registry) (err error) {
p.lock.Lock()
defer p.lock.Unlock()
_, ok := p.plugins[plugin.Name()]
if ok {
err = fmt.Errorf("duplicate registry plugin")
return
}
p.plugins[plugin.Name()] = plugin
return
}
func (p *PluginMgr) initRegistry(ctx context.Context, name string,
opts ...Option) (registry Registry, err error) {
//查找对应的插件是否存在
p.lock.Lock()
defer p.lock.Unlock()
plugin, ok := p.plugins[name]
if !ok {
err = fmt.Errorf("plugin %s not exists", name)
return
}
registry = plugin
err = plugin.Init(ctx, opts...)
return
}
// 注册插件
func RegisterPlugin(registry Registry) (err error) {
return pluginMgr.registerPlugin(registry)
}
// 初始化注册中心
func InitRegistry(ctx context.Context, name string, opts ...Option) (registry Registry, err error) {
return pluginMgr.initRegistry(ctx, name, opts...)
}
package registry
import (
"context"
)
// 服务注册插件的接口
type Registry interface {
//插件的名字
Name() string
//初始化
Init(ctx context.Context, opts ...Option) (err error)
//服务注册
Register(ctx context.Context, service *Service) (err error)
//服务反注册
Unregister(ctx context.Context, service *Service) (err error)
//服务发现:通过服务的名字获取服务的位置信息(ip和port列表)
GetService(ctx context.Context, name string) (service *Service, err error)
}
package registry
// 服务抽象
type Service struct {
Name string `json:"name"`
Nodes []*Node `json:"nodes"`
}
// 服务节点的抽象
type Node struct {
Id string `json:"id"`
IP string `json:"ip"`
Port int `json:"port"`
Weight int `json:"weight"`
}
type AllServiceInfo struct {
ServiceMap map[string]*Service //节点信息 servicename node
}
package main
import (
"bytes"
"github.com/ichunt2019/ichunt-micro-service/proxy/load_balance"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"time"
)
var (
addr = "127.0.0.1:2002"
transport = &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second, //连接超时
KeepAlive: 30 * time.Second, //长连接超时时间
}).DialContext,
MaxIdleConns: 100, //最大空闲连接
IdleConnTimeout: 90 * time.Second, //空闲超时时间
TLSHandshakeTimeout: 10 * time.Second, //tls握手超时时间
ExpectContinueTimeout: 1 * time.Second, //100-continue状态码超时时间
}
)
func NewMultipleHostsReverseProxy(lb load_balance.LoadBalance) *httputil.ReverseProxy {
//请求协调者
director := func(req *http.Request) {
nextAddr, err := lb.Get(req.RemoteAddr)
if err != nil {
log.Fatal("get next addr fail")
}
target, err := url.Parse(nextAddr)
if err != nil {
log.Fatal(err)
}
targetQuery := target.RawQuery
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
if _, ok := req.Header["User-Agent"]; !ok {
req.Header.Set("User-Agent", "user-agent")
}
//只在第一代理中设置此header头
req.Header.Set("X-Real-Ip", req.RemoteAddr)
}
//更改内容
modifyFunc := func(resp *http.Response) error {
//请求以下命令:curl 'http://127.0.0.1:2002/error'
if resp.StatusCode != 200 {
//获取内容
oldPayload, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
//追加内容
newPayload := []byte("StatusCode error:" + string(oldPayload))
resp.Body = ioutil.NopCloser(bytes.NewBuffer(newPayload))
resp.ContentLength = int64(len(newPayload))
resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(newPayload)), 10))
}
return nil
}
//错误回调 :关闭real_server时测试,错误回调
//范围:transport.RoundTrip发生的错误、以及ModifyResponse发生的错误
errFunc := func(w http.ResponseWriter, r *http.Request, err error) {
//todo 如果是权重的负载则调整临时权重
http.Error(w, "ErrorHandler error:"+err.Error(), 500)
}
return &httputil.ReverseProxy{Director: director, Transport: transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc}
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
func main() {
rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
if err := rb.Add("http://127.0.0.1:2003/", "10"); err != nil {
log.Println(err)
}
if err := rb.Add("http://127.0.0.1:2004/", "20"); err != nil {
log.Println(err)
}
proxy := NewMultipleHostsReverseProxy(rb)
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, proxy))
}
package main
import (
"bytes"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"net/http/httputil"
"net/url"
"regexp"
"strconv"
"strings"
"time"
)
var addr = "127.0.0.1:2001"
func main() {
rs1 := "http://127.0.0.1:2002"
url1, err1 := url.Parse(rs1)
if err1 != nil {
log.Println(err1)
}
urls := []*url.URL{url1}
proxy := NewMultipleHostsReverseProxy(urls)
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, proxy))
}
var transport = &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second, //连接超时
KeepAlive: 30 * time.Second, //长连接超时时间
}).DialContext,
MaxIdleConns: 100, //最大空闲连接
IdleConnTimeout: 90 * time.Second, //空闲超时时间
TLSHandshakeTimeout: 10 * time.Second, //tls握手超时时间
ExpectContinueTimeout: 1 * time.Second, //100-continue 超时时间
}
func NewMultipleHostsReverseProxy(targets []*url.URL) *httputil.ReverseProxy {
//请求协调者
director := func(req *http.Request) {
//url_rewrite
//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2003/base/abc ??
//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2002/abc
//127.0.0.1:2002/abc ==> 127.0.0.1:2003/base/abc
re, _ := regexp.Compile("^/dir(.*)");
req.URL.Path = re.ReplaceAllString(req.URL.Path, "$1")
//随机负载均衡
targetIndex := rand.Intn(len(targets))
target := targets[targetIndex]
targetQuery := target.RawQuery
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
// url地址重写:重写前:/aa 重写后:/base/aa
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
if _, ok := req.Header["User-Agent"]; !ok {
req.Header.Set("User-Agent", "user-agent")
}
//只在第一代理中设置此header头
req.Header.Set("X-Real-Ip", req.RemoteAddr)
}
//更改内容
modifyFunc := func(resp *http.Response) error {
//请求以下命令:curl 'http://127.0.0.1:2002/error'
if resp.StatusCode != 200 {
//获取内容
oldPayload, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
//追加内容
newPayload := []byte("StatusCode error:" + string(oldPayload))
resp.Body = ioutil.NopCloser(bytes.NewBuffer(newPayload))
resp.ContentLength = int64(len(newPayload))
resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(newPayload)), 10))
}
return nil
}
//错误回调 :关闭real_server时测试,错误回调
errFunc := func(w http.ResponseWriter, r *http.Request, err error) {
http.Error(w, "ErrorHandler error:"+err.Error(), 500)
}
return &httputil.ReverseProxy{
Director: director,
Transport: transport,
ModifyResponse: modifyFunc,
ErrorHandler: errFunc}
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
package main
import (
"context"
"fmt"
"github.com/ichunt2019/ichunt-micro-service/registry"
_ "github.com/ichunt2019/ichunt-micro-service/registry/etcd"
"io"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
rs1 := &RealServer{Addr: "127.0.0.1:2003"}
rs1.Run()
rs2 := &RealServer{Addr: "127.0.0.1:2004"}
rs2.Run()
//服务注册
register()
//监听关闭信号
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
type RealServer struct {
Addr string
}
func register(){
registryInst, err := registry.InitRegistry(context.TODO(), "etcd",
registry.WithAddrs([]string{"192.168.2.232:2379"}),
registry.WithTimeout(time.Second),
registry.WithRegistryPath("/ichuntMicroService/"),
registry.WithHeartBeat(5),
)
if err != nil {
fmt.Printf("init registry failed, err:%v", err)
return
}
service := &registry.Service{
Name: "comment_service",
}
service.Nodes = append(service.Nodes, &registry.Node{
IP: "127.0.0.1",
Port: 2003,
},
&registry.Node{
IP: "127.0.0.1",
Port: 2004,
},
)
registryInst.Register(context.TODO(), service)
}
func (r *RealServer) Run() {
log.Println("Starting httpserver at " + r.Addr)
mux := http.NewServeMux()
mux.HandleFunc("/", r.HelloHandler)
mux.HandleFunc("/base/error", r.ErrorHandler)
server := &http.Server{
Addr: r.Addr,
WriteTimeout: time.Second * 3,
Handler: mux,
}
go func() {
log.Fatal(server.ListenAndServe())
}()
}
func (r *RealServer) HelloHandler(w http.ResponseWriter, req *http.Request) {
//127.0.0.1:8008/abc?sdsdsa=11
//r.Addr=127.0.0.1:8008
//req.URL.Path=/abc
fmt.Println(req.Host)
upath := fmt.Sprintf("http://%s%s\n", r.Addr, req.URL.Path)
realIP := fmt.Sprintf("RemoteAddr=%s,X-Forwarded-For=%v,X-Real-Ip=%v\n", req.RemoteAddr, req.Header.Get("X-Forwarded-For"),
req.Header.Get("X-Real-Ip"))
io.WriteString(w, upath)
io.WriteString(w, realIP)
}
func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) {
upath := "error handler"
w.WriteHeader(500)
io.WriteString(w, upath)
}
\ No newline at end of file
package main
import (
"bytes"
"compress/gzip"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"net/http/httputil"
"net/url"
"regexp"
"strconv"
"strings"
"time"
)
var addr = "127.0.0.1:2002"
func main() {
//rs1 := "http://www.baidu.com"
rs1 := "http://127.0.0.1:2003"
url1, err1 := url.Parse(rs1)
if err1 != nil {
log.Println(err1)
}
//rs2 := "http://www.baidu.com"
rs2 := "http://127.0.0.1:2004"
url2, err2 := url.Parse(rs2)
if err2 != nil {
log.Println(err2)
}
urls := []*url.URL{url1, url2}
proxy := NewMultipleHostsReverseProxy(urls)
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, proxy))
}
var transport = &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second, //连接超时
KeepAlive: 30 * time.Second, //长连接超时时间
}).DialContext,
MaxIdleConns: 100, //最大空闲连接
IdleConnTimeout: 90 * time.Second, //空闲超时时间
TLSHandshakeTimeout: 10 * time.Second, //tls握手超时时间
ExpectContinueTimeout: 1 * time.Second, //100-continue 超时时间
}
func NewMultipleHostsReverseProxy(targets []*url.URL) *httputil.ReverseProxy {
//请求协调者
director := func(req *http.Request) {
//url_rewrite
//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2003/base/abc ??
//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2002/abc
//127.0.0.1:2002/abc ==> 127.0.0.1:2003/base/abc
re, _ := regexp.Compile("^/dir(.*)");
req.URL.Path = re.ReplaceAllString(req.URL.Path, "$1")
//随机负载均衡
targetIndex := rand.Intn(len(targets))
target := targets[targetIndex]
targetQuery := target.RawQuery
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
//todo 部分章节补充1
//todo 当对域名(非内网)反向代理时需要设置此项。当作后端反向代理时不需要
req.Host = target.Host
// url地址重写:重写前:/aa 重写后:/base/aa
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
if _, ok := req.Header["User-Agent"]; !ok {
req.Header.Set("User-Agent", "user-agent")
}
//只在第一代理中设置此header头
//req.Header.Set("X-Real-Ip", req.RemoteAddr)
}
//更改内容
modifyFunc := func(resp *http.Response) error {
//请求以下命令:curl 'http://127.0.0.1:2002/error'
//todo 部分章节功能补充2
//todo 兼容websocket
if strings.Contains(resp.Header.Get("Connection"), "Upgrade") {
return nil
}
var payload []byte
var readErr error
//todo 部分章节功能补充3
//todo 兼容gzip压缩
if strings.Contains(resp.Header.Get("Content-Encoding"), "gzip") {
gr, err := gzip.NewReader(resp.Body)
if err != nil {
return err
}
payload, readErr = ioutil.ReadAll(gr)
resp.Header.Del("Content-Encoding")
} else {
payload, readErr = ioutil.ReadAll(resp.Body)
}
if readErr != nil {
return readErr
}
//异常请求时设置StatusCode
if resp.StatusCode != 200 {
payload = []byte("StatusCode error:" + string(payload))
}
//todo 部分章节功能补充4
//todo 因为预读了数据所以内容重新回写
resp.Body = ioutil.NopCloser(bytes.NewBuffer(payload))
resp.ContentLength = int64(len(payload))
resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(payload)), 10))
return nil
}
//错误回调 :关闭real_server时测试,错误回调
errFunc := func(w http.ResponseWriter, r *http.Request, err error) {
http.Error(w, "ErrorHandler error:"+err.Error(), 500)
}
return &httputil.ReverseProxy{
Director: director,
Transport: transport,
ModifyResponse: modifyFunc,
ErrorHandler: errFunc}
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment