Commit 6506d206 by 杨树贤
parents 484dc36b 7e73c092
Showing with 4755 additions and 2 deletions

Too many changes to show.

To preserve performance only 1000 of 1000+ files are displayed.

......@@ -7,6 +7,7 @@ import (
"go_sku_server/boot"
"go_sku_server/pkg/config"
"go_sku_server/routes"
service2 "go_sku_server/service"
)
func main() {
......@@ -26,11 +27,20 @@ func main() {
web.Handler(r),
web.Address(":"+port),
)
service2.Supplier.GetSupplierForbidMap()
////初始化禁用规则
//go func() {
// for true {
// service2.Supplier.GetSupplierForbidMap()
// time.Sleep(3600 * time.Second)
// }
//}()
if err := service.Init(); err != nil {
panic(err)
}
if err := service.Run(); err != nil {
panic(err)
}
}
;总配置信息
[web]
port = 60014
mode = release
[message]
api_domain = http://api.ichunt.com/msg/sendMessageByAuto
api_md5_str = fh6y5t4rr351d2c3bryi
;钉钉配置信息
[DINGDING]
SEARCH_API_MONITOR = 6d0fa85e01a02c39347d011ae973fd21b76c6c7ce582d3ea470c6b65a318848d
;存放数据库连接信息
[xorm]
ShowSQL = false
[spu]
user_name = LxiCSpu
password = `Mysx3Tyzlo00oxlmlly`
host = 172.18.137.19
database = liexin_spu
table_prefix =lie_
type = mysql
[liexin_data]
user_name = LxDDUsedRead
password = `0o9u0Ux2oAoYddflmxXtZss`
host = 172.18.137.33
database = liexin_data
table_prefix =lie_
type = mysql
[supp]
user_name = SupDbUser
password = `Supssy2@@!!@$#yxy`
host = 172.18.137.21
database = liexin_supp
table_prefix =lie_
type = mysql
[cms]
user_name = dtuser
password = `dAtaL#ym2902m2lLX2y33`
host = appdb-master.ichunt.db
database = icdata
table_prefix =
type = mysql
; 比如 sku_save,5000 路径 sku_save 即文件夹是 sku 文件名类似 是save_2020-12-10.log,5000代表队列的容量为5000
[log_config]
1=lysku_save,5000
2=zysku_save,5000
3=sku_query,5000
4=default_sku,5000
;存放mongodb连接信息
[mongo]
host = 172.18.137.23:27017
username = "ichunt"
password = "huntmon66499"
database = ichunt
maxPoolSize=1000
[pre_sku_mongo]
host = 172.18.137.35:27016
username = "ichunt"
password = "huntmon66499"
database = ichunt
maxPoolSize=8000
;存放rabmq连接信息
[rabmq]
url = amqp://huntmouser:jy2y2900@119.23.79.136:5672/
;存放本系统所有的队列名称
[rabmq_all]
;redis连接信息
[default_redis_read]
host = 172.18.137.38:6379
password = icDb29mLy2s
max_idle = 200
max_active = 5000
idle_timeout = 5
[default_redis_write]
host = 172.18.137.38:6379
password = icDb29mLy2s
max_idle = 200
max_active = 5000
idle_timeout = 5
[default_redis_spu]
host = spu-redis.ichunt.db:6379
password = icDb29mLy1s
max_idle = 200
max_active = 5000
idle_timeout = 5
;存放redis所有键
[redis_all]
......@@ -35,7 +35,8 @@ require (
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/testify v1.5.1 // indirect
github.com/syyongx/php2go v0.9.4
github.com/tidwall/gjson v1.6.1
github.com/tidwall/gjson v1.14.2
github.com/tidwall/sjson v1.2.5 // indirect
go.uber.org/zap v1.14.1 // indirect
golang.org/x/sys v0.0.0-20201204225414-ed752295db88 // indirect
golang.org/x/text v0.3.3 // indirect
......
......@@ -92,6 +92,7 @@ type LySku struct {
Tariff float64 `json:"tariff"`
TariffFormat string `json:"tariff_format"`
AbilityLevel int `json:"ability_level"`
LabelOp int `json:"label_op"`
BrandPack string `json:"brand_pack"`
OnwayStock int `json:"onway_stock"`
CompareRatio float64 `json:"compare_ratio"`
......
package model
// 供应商禁用信息map
type SupplierForbidMap map[string]ForbidInfo
// 供应商禁止信息,会转换一次
type ForbidInfo struct {
SupplierId int `json:"supplier_id"` // 供应商id
StandardBrandIds []int // 禁止标准品牌列表
Eccns []string // 禁止eccn列表
GoodsNames []string // 禁止型号列表
SkuIds []string // 禁止sku_ids
SpuIds []string // 禁止spu_ids
ClassId1 []string //
ClassId2 []string //
EccnNo []string // 白名单 eccn
}
// 供应商禁止信息, redis中存储的
type RedisForbidInfo struct {
SupplierId int `json:"supplier_id"` // 供应商id
StandardBrandId string `json:"standard_brand_id"` // 禁止标准品牌,多个用,分割 8,130
Eccn string `json:"eccn"` // 禁止eccn ,多个用,分割 %3A001%,%3A292%
GoodsName string `json:"goods_name"` // 禁止型号,多个用 € 分割
SkuId string `json:"sku_id"` // 禁止sku_id多个用 € 分割
SpuId string `json:"spu_id"` // 禁止spu_id多个用 € 分割
ClassId1 string `json:"class_id1"`
ClassId2 string `json:"class_id2"`
EccnNo string `json:"eccn_no"` // 白名单 eccn
}
......@@ -27,6 +27,49 @@ import (
"github.com/tidwall/gjson"
)
func GetMd5KeyUpper(str string) string {
lowerStr := strings.ToUpper(str)
// 创建 MD5 散列对象
hash := md5.New()
// 计算字符串的 MD5 散列值
hash.Write([]byte(lowerStr))
// 获取计算后的 MD5 散列值
hashed := hash.Sum(nil)
// 将散列值转换为十六进制字符串表示
md5Str := hex.EncodeToString(hashed)
return md5Str
}
// 判断是否在字符串数组切片中
func InStringArray(needle string, haystack []string) bool {
for _, item := range haystack {
if item == needle {
return true
}
}
return false
}
func InIntArray(target int, arr []int) bool {
sort.Ints(arr)
index := sort.SearchInts(arr, target)
return index < len(arr) && arr[index] == target
}
// ReplaceSpecialCharts 替换特殊字符
func ReplaceSpecialCharts(str string) string {
reg := regexp.MustCompile(`[^A-Za-z0-9]+`)
return reg.ReplaceAllString(str, "")
}
func GetUpperAndTrimStr(str string) string {
// 使用 Fields 方法将字符串拆分为单词,并重新组合
words := strings.Fields(str)
filteredStr := strings.Join(words, "")
upperStr := strings.ToUpper(filteredStr)
return upperStr
}
/*
gjson 判断某值是否存在 map
@param json string 分析json字符串
......
......@@ -19,6 +19,62 @@ import (
type ActivityService struct {
}
/*
控制前端3.0购买 加入购物车 联系销售等按钮
https://www.tapd.cn/tapd_fe/20225591/story/detail/1120225591001012052
todo 1 加入购物车、立即购买 :
6项条件须同时满足
是否属于网站屏蔽范围:不属于
商品状态:上架
商品是否过期:否
库存数量:不为空,且,>0,且,≥起订量
商品价格:不为空,且,>0(人民币、美金价格至少有一个满足)
履约程度:强履约
todo 2 加入询价池、立即询价
6项条件须同时满足
是否属于网站屏蔽范围:不属于
商品状态:上架
商品是否过期:否
库存数量:不为空,且,>0,且,≥起订量
商品价格:不为空,且,>0(人民币、美金价格至少有一个满足)
履约程度:弱履约
todo 3 联系销售
规则1 且 规则2 不符合
return label_op 操作按钮,1加入购物车(立即购买) 2 加入询价池 3联系销售
*/
func (as *ActivityService) GetLabelOp(sku model.LySku) (op int) {
var taxCheck map[string]map[string]string
if sku.SupplierId == 7 {
taxCheck = Supplier.GetTaxMap([]string{sku.GoodsName}, "3C目录内")
}
forbidStatus, _ := Supplier.GetSkuForbidStatus(sku.SupplierId, sku.StandardBrand.StandardBrandId, sku.GoodsName, sku.GoodsId, sku.Eccn, sku.SpuId, sku.Canal, gconv.String(sku.ClassID1), gconv.String(sku.ClassID2), taxCheck)
op = 3 //联系销售
if sku.GoodsStatus == 1 && sku.IsExpire == 0 && forbidStatus == 0 && sku.Stock > 0 && len(sku.LadderPrice) > 0 && sku.Stock >= sku.Moq {
switch sku.AbilityLevel { //履约级别:-1 无 0:弱履约,1:中履约;2:强履约
case -1:
op = 3
break
case 0:
op = 2
break
case 1:
op = 2
break
case 2:
op = 1 //可购买
break
}
}
return op
}
// 获取活动信息,目前是包括促销活动(系数打折)以及满赠活动
func (as *ActivityService) GetActivityData(checkData model.ActivityCheckData) (priceActivity model.PriceActivity, giftActivity model.GiftActivity) {
supplierId := checkData.SupplierId
......
......@@ -73,6 +73,7 @@ func (ls *LyService) LyGoodsDetail(ctx context.Context, params RequestParams, go
skuArr := gredis.Hmget("default_r", "sku", goodsIds)
//为了性能着想,这边也先去批量获取spu的信息
var spuService SpuService
var activityService ActivityService
spuList := spuService.getSpuList(skuArr)
GoodsRes := sync.Map{}
......@@ -331,6 +332,8 @@ func (ls *LyService) LyGoodsDetail(ctx context.Context, params RequestParams, go
if has {
sku.Stock = int64(limitStock)
}
//计算按钮
sku.LabelOp = activityService.GetLabelOp(sku)
//最后一步,将sku的全部信息放到有序map里面
GoodsRes.Store(goodsId, sku)
......
......@@ -2,6 +2,7 @@ package service
import (
"context"
"github.com/gogf/gf/util/gconv"
"go_sku_server/model"
"go_sku_server/pkg/common"
"go_sku_server/pkg/gredis"
......@@ -126,10 +127,15 @@ func (qs *ZiyingService) ZyGoodsDetail(ctx context.Context, params RequestParams
spuId := gjson.Get(info, "spu_id").Int()
var standardBrand model.StandardBrand
var brandId int64
var spuClassId1 int64
var spuClassId2 int64
var brandName string
var activityService ActivityService
if spuId != 0 {
spuStr, _ := gredis.String(redisConnSpu.Do("HGET", "spu", spuId))
brandId = gjson.Get(spuStr, "brand_id").Int()
spuClassId1 = gjson.Get(spuStr, "class_id1").Int()
spuClassId2 = gjson.Get(spuStr, "class_id2").Int()
brandName, _ = gredis.String(redisConn.Do("HGET", "brand", brandId))
var ly LyService
standardBrand = ly.GetStandardBrand(brandId)
......@@ -353,6 +359,22 @@ func (qs *ZiyingService) ZyGoodsDetail(ctx context.Context, params RequestParams
A.Set("canal", "L0003270") //自营写死编码
//todo 计算购买按钮
nowstock, _ := A.Get("stock")
var lySku = model.LySku{
GoodsId: goodsId,
GoodsStatus: gjson.Get(info, "status").Int(),
Stock: gconv.Int64(nowstock),
AbilityLevel: 2,
StandardBrand: standardBrand,
GoodsName: gjson.Get(info, "goods_name").String(),
SpuId: gconv.String(spuId),
ClassID1: gconv.Int(spuClassId1),
ClassID2: gconv.Int(spuClassId2),
LadderPrice: ladderPrice,
}
A.Set("label_op", activityService.GetLabelOp(lySku))
//最后写入sync map
(GoodsRes).Store(goodsId, A)
}
......
TAGS
tags
.*.swp
tomlcheck/tomlcheck
toml.test
language: go
go:
- 1.1
- 1.2
- 1.3
- 1.4
- 1.5
- 1.6
- tip
install:
- go install ./...
- go get github.com/BurntSushi/toml-test
script:
- export PATH="$PATH:$HOME/gopath/bin"
- make test
Compatible with TOML version
[v0.4.0](https://github.com/toml-lang/toml/blob/v0.4.0/versions/en/toml-v0.4.0.md)
The MIT License (MIT)
Copyright (c) 2013 TOML authors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
install:
go install ./...
test: install
go test -v
toml-test toml-test-decoder
toml-test -encoder toml-test-encoder
fmt:
gofmt -w *.go */*.go
colcheck *.go */*.go
tags:
find ./ -name '*.go' -print0 | xargs -0 gotags > TAGS
push:
git push origin master
git push github master
## TOML parser and encoder for Go with reflection
TOML stands for Tom's Obvious, Minimal Language. This Go package provides a
reflection interface similar to Go's standard library `json` and `xml`
packages. This package also supports the `encoding.TextUnmarshaler` and
`encoding.TextMarshaler` interfaces so that you can define custom data
representations. (There is an example of this below.)
Spec: https://github.com/toml-lang/toml
Compatible with TOML version
[v0.4.0](https://github.com/toml-lang/toml/blob/master/versions/en/toml-v0.4.0.md)
Documentation: https://godoc.org/github.com/BurntSushi/toml
Installation:
```bash
go get github.com/BurntSushi/toml
```
Try the toml validator:
```bash
go get github.com/BurntSushi/toml/cmd/tomlv
tomlv some-toml-file.toml
```
[![Build Status](https://travis-ci.org/BurntSushi/toml.svg?branch=master)](https://travis-ci.org/BurntSushi/toml) [![GoDoc](https://godoc.org/github.com/BurntSushi/toml?status.svg)](https://godoc.org/github.com/BurntSushi/toml)
### Testing
This package passes all tests in
[toml-test](https://github.com/BurntSushi/toml-test) for both the decoder
and the encoder.
### Examples
This package works similarly to how the Go standard library handles `XML`
and `JSON`. Namely, data is loaded into Go values via reflection.
For the simplest example, consider some TOML file as just a list of keys
and values:
```toml
Age = 25
Cats = [ "Cauchy", "Plato" ]
Pi = 3.14
Perfection = [ 6, 28, 496, 8128 ]
DOB = 1987-07-05T05:45:00Z
```
Which could be defined in Go as:
```go
type Config struct {
Age int
Cats []string
Pi float64
Perfection []int
DOB time.Time // requires `import time`
}
```
And then decoded with:
```go
var conf Config
if _, err := toml.Decode(tomlData, &conf); err != nil {
// handle error
}
```
You can also use struct tags if your struct field name doesn't map to a TOML
key value directly:
```toml
some_key_NAME = "wat"
```
```go
type TOML struct {
ObscureKey string `toml:"some_key_NAME"`
}
```
### Using the `encoding.TextUnmarshaler` interface
Here's an example that automatically parses duration strings into
`time.Duration` values:
```toml
[[song]]
name = "Thunder Road"
duration = "4m49s"
[[song]]
name = "Stairway to Heaven"
duration = "8m03s"
```
Which can be decoded with:
```go
type song struct {
Name string
Duration duration
}
type songs struct {
Song []song
}
var favorites songs
if _, err := toml.Decode(blob, &favorites); err != nil {
log.Fatal(err)
}
for _, s := range favorites.Song {
fmt.Printf("%s (%s)\n", s.Name, s.Duration)
}
```
And you'll also need a `duration` type that satisfies the
`encoding.TextUnmarshaler` interface:
```go
type duration struct {
time.Duration
}
func (d *duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
}
```
### More complex usage
Here's an example of how to load the example from the official spec page:
```toml
# This is a TOML document. Boom.
title = "TOML Example"
[owner]
name = "Tom Preston-Werner"
organization = "GitHub"
bio = "GitHub Cofounder & CEO\nLikes tater tots and beer."
dob = 1979-05-27T07:32:00Z # First class dates? Why not?
[database]
server = "192.168.1.1"
ports = [ 8001, 8001, 8002 ]
connection_max = 5000
enabled = true
[servers]
# You can indent as you please. Tabs or spaces. TOML don't care.
[servers.alpha]
ip = "10.0.0.1"
dc = "eqdc10"
[servers.beta]
ip = "10.0.0.2"
dc = "eqdc10"
[clients]
data = [ ["gamma", "delta"], [1, 2] ] # just an update to make sure parsers support it
# Line breaks are OK when inside arrays
hosts = [
"alpha",
"omega"
]
```
And the corresponding Go types are:
```go
type tomlConfig struct {
Title string
Owner ownerInfo
DB database `toml:"database"`
Servers map[string]server
Clients clients
}
type ownerInfo struct {
Name string
Org string `toml:"organization"`
Bio string
DOB time.Time
}
type database struct {
Server string
Ports []int
ConnMax int `toml:"connection_max"`
Enabled bool
}
type server struct {
IP string
DC string
}
type clients struct {
Data [][]interface{}
Hosts []string
}
```
Note that a case insensitive match will be tried if an exact match can't be
found.
A working example of the above can be found in `_examples/example.{go,toml}`.
package toml
import "strings"
// MetaData allows access to meta information about TOML data that may not
// be inferrable via reflection. In particular, whether a key has been defined
// and the TOML type of a key.
type MetaData struct {
mapping map[string]interface{}
types map[string]tomlType
keys []Key
decoded map[string]bool
context Key // Used only during decoding.
}
// IsDefined returns true if the key given exists in the TOML data. The key
// should be specified hierarchially. e.g.,
//
// // access the TOML key 'a.b.c'
// IsDefined("a", "b", "c")
//
// IsDefined will return false if an empty key given. Keys are case sensitive.
func (md *MetaData) IsDefined(key ...string) bool {
if len(key) == 0 {
return false
}
var hash map[string]interface{}
var ok bool
var hashOrVal interface{} = md.mapping
for _, k := range key {
if hash, ok = hashOrVal.(map[string]interface{}); !ok {
return false
}
if hashOrVal, ok = hash[k]; !ok {
return false
}
}
return true
}
// Type returns a string representation of the type of the key specified.
//
// Type will return the empty string if given an empty key or a key that
// does not exist. Keys are case sensitive.
func (md *MetaData) Type(key ...string) string {
fullkey := strings.Join(key, ".")
if typ, ok := md.types[fullkey]; ok {
return typ.typeString()
}
return ""
}
// Key is the type of any TOML key, including key groups. Use (MetaData).Keys
// to get values of this type.
type Key []string
func (k Key) String() string {
return strings.Join(k, ".")
}
func (k Key) maybeQuotedAll() string {
var ss []string
for i := range k {
ss = append(ss, k.maybeQuoted(i))
}
return strings.Join(ss, ".")
}
func (k Key) maybeQuoted(i int) string {
quote := false
for _, c := range k[i] {
if !isBareKeyChar(c) {
quote = true
break
}
}
if quote {
return "\"" + strings.Replace(k[i], "\"", "\\\"", -1) + "\""
}
return k[i]
}
func (k Key) add(piece string) Key {
newKey := make(Key, len(k)+1)
copy(newKey, k)
newKey[len(k)] = piece
return newKey
}
// Keys returns a slice of every key in the TOML data, including key groups.
// Each key is itself a slice, where the first element is the top of the
// hierarchy and the last is the most specific.
//
// The list will have the same order as the keys appeared in the TOML data.
//
// All keys returned are non-empty.
func (md *MetaData) Keys() []Key {
return md.keys
}
// Undecoded returns all keys that have not been decoded in the order in which
// they appear in the original TOML document.
//
// This includes keys that haven't been decoded because of a Primitive value.
// Once the Primitive value is decoded, the keys will be considered decoded.
//
// Also note that decoding into an empty interface will result in no decoding,
// and so no keys will be considered decoded.
//
// In this sense, the Undecoded keys correspond to keys in the TOML document
// that do not have a concrete type in your representation.
func (md *MetaData) Undecoded() []Key {
undecoded := make([]Key, 0, len(md.keys))
for _, key := range md.keys {
if !md.decoded[key.String()] {
undecoded = append(undecoded, key)
}
}
return undecoded
}
/*
Package toml provides facilities for decoding and encoding TOML configuration
files via reflection. There is also support for delaying decoding with
the Primitive type, and querying the set of keys in a TOML document with the
MetaData type.
The specification implemented: https://github.com/toml-lang/toml
The sub-command github.com/BurntSushi/toml/cmd/tomlv can be used to verify
whether a file is a valid TOML document. It can also be used to print the
type of each key in a TOML document.
Testing
There are two important types of tests used for this package. The first is
contained inside '*_test.go' files and uses the standard Go unit testing
framework. These tests are primarily devoted to holistically testing the
decoder and encoder.
The second type of testing is used to verify the implementation's adherence
to the TOML specification. These tests have been factored into their own
project: https://github.com/BurntSushi/toml-test
The reason the tests are in a separate project is so that they can be used by
any implementation of TOML. Namely, it is language agnostic.
*/
package toml
// +build go1.2
package toml
// In order to support Go 1.1, we define our own TextMarshaler and
// TextUnmarshaler types. For Go 1.2+, we just alias them with the
// standard library interfaces.
import (
"encoding"
)
// TextMarshaler is a synonym for encoding.TextMarshaler. It is defined here
// so that Go 1.1 can be supported.
type TextMarshaler encoding.TextMarshaler
// TextUnmarshaler is a synonym for encoding.TextUnmarshaler. It is defined
// here so that Go 1.1 can be supported.
type TextUnmarshaler encoding.TextUnmarshaler
// +build !go1.2
package toml
// These interfaces were introduced in Go 1.2, so we add them manually when
// compiling for Go 1.1.
// TextMarshaler is a synonym for encoding.TextMarshaler. It is defined here
// so that Go 1.1 can be supported.
type TextMarshaler interface {
MarshalText() (text []byte, err error)
}
// TextUnmarshaler is a synonym for encoding.TextUnmarshaler. It is defined
// here so that Go 1.1 can be supported.
type TextUnmarshaler interface {
UnmarshalText(text []byte) error
}
au BufWritePost *.go silent!make tags > /dev/null 2>&1
package toml
// tomlType represents any Go type that corresponds to a TOML type.
// While the first draft of the TOML spec has a simplistic type system that
// probably doesn't need this level of sophistication, we seem to be militating
// toward adding real composite types.
type tomlType interface {
typeString() string
}
// typeEqual accepts any two types and returns true if they are equal.
func typeEqual(t1, t2 tomlType) bool {
if t1 == nil || t2 == nil {
return false
}
return t1.typeString() == t2.typeString()
}
func typeIsHash(t tomlType) bool {
return typeEqual(t, tomlHash) || typeEqual(t, tomlArrayHash)
}
type tomlBaseType string
func (btype tomlBaseType) typeString() string {
return string(btype)
}
func (btype tomlBaseType) String() string {
return btype.typeString()
}
var (
tomlInteger tomlBaseType = "Integer"
tomlFloat tomlBaseType = "Float"
tomlDatetime tomlBaseType = "Datetime"
tomlString tomlBaseType = "String"
tomlBool tomlBaseType = "Bool"
tomlArray tomlBaseType = "Array"
tomlHash tomlBaseType = "Hash"
tomlArrayHash tomlBaseType = "ArrayHash"
)
// typeOfPrimitive returns a tomlType of any primitive value in TOML.
// Primitive values are: Integer, Float, Datetime, String and Bool.
//
// Passing a lexer item other than the following will cause a BUG message
// to occur: itemString, itemBool, itemInteger, itemFloat, itemDatetime.
func (p *parser) typeOfPrimitive(lexItem item) tomlType {
switch lexItem.typ {
case itemInteger:
return tomlInteger
case itemFloat:
return tomlFloat
case itemDatetime:
return tomlDatetime
case itemString:
return tomlString
case itemMultilineString:
return tomlString
case itemRawString:
return tomlString
case itemRawMultilineString:
return tomlString
case itemBool:
return tomlBool
}
p.bug("Cannot infer primitive type of lex item '%s'.", lexItem)
panic("unreachable")
}
// typeOfArray returns a tomlType for an array given a list of types of its
// values.
//
// In the current spec, if an array is homogeneous, then its type is always
// "Array". If the array is not homogeneous, an error is generated.
func (p *parser) typeOfArray(types []tomlType) tomlType {
// Empty arrays are cool.
if len(types) == 0 {
return tomlArray
}
theType := types[0]
for _, t := range types[1:] {
if !typeEqual(theType, t) {
p.panicf("Array contains values of type '%s' and '%s', but "+
"arrays must be homogeneous.", theType, t)
}
}
return tomlArray
}
package toml
// Struct field handling is adapted from code in encoding/json:
//
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the Go distribution.
import (
"reflect"
"sort"
"sync"
)
// A field represents a single field found in a struct.
type field struct {
name string // the name of the field (`toml` tag included)
tag bool // whether field has a `toml` tag
index []int // represents the depth of an anonymous field
typ reflect.Type // the type of the field
}
// byName sorts field by name, breaking ties with depth,
// then breaking ties with "name came from toml tag", then
// breaking ties with index sequence.
type byName []field
func (x byName) Len() int { return len(x) }
func (x byName) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
func (x byName) Less(i, j int) bool {
if x[i].name != x[j].name {
return x[i].name < x[j].name
}
if len(x[i].index) != len(x[j].index) {
return len(x[i].index) < len(x[j].index)
}
if x[i].tag != x[j].tag {
return x[i].tag
}
return byIndex(x).Less(i, j)
}
// byIndex sorts field by index sequence.
type byIndex []field
func (x byIndex) Len() int { return len(x) }
func (x byIndex) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
func (x byIndex) Less(i, j int) bool {
for k, xik := range x[i].index {
if k >= len(x[j].index) {
return false
}
if xik != x[j].index[k] {
return xik < x[j].index[k]
}
}
return len(x[i].index) < len(x[j].index)
}
// typeFields returns a list of fields that TOML should recognize for the given
// type. The algorithm is breadth-first search over the set of structs to
// include - the top struct and then any reachable anonymous structs.
func typeFields(t reflect.Type) []field {
// Anonymous fields to explore at the current level and the next.
current := []field{}
next := []field{{typ: t}}
// Count of queued names for current level and the next.
count := map[reflect.Type]int{}
nextCount := map[reflect.Type]int{}
// Types already visited at an earlier level.
visited := map[reflect.Type]bool{}
// Fields found.
var fields []field
for len(next) > 0 {
current, next = next, current[:0]
count, nextCount = nextCount, map[reflect.Type]int{}
for _, f := range current {
if visited[f.typ] {
continue
}
visited[f.typ] = true
// Scan f.typ for fields to include.
for i := 0; i < f.typ.NumField(); i++ {
sf := f.typ.Field(i)
if sf.PkgPath != "" && !sf.Anonymous { // unexported
continue
}
opts := getOptions(sf.Tag)
if opts.skip {
continue
}
index := make([]int, len(f.index)+1)
copy(index, f.index)
index[len(f.index)] = i
ft := sf.Type
if ft.Name() == "" && ft.Kind() == reflect.Ptr {
// Follow pointer.
ft = ft.Elem()
}
// Record found field and index sequence.
if opts.name != "" || !sf.Anonymous || ft.Kind() != reflect.Struct {
tagged := opts.name != ""
name := opts.name
if name == "" {
name = sf.Name
}
fields = append(fields, field{name, tagged, index, ft})
if count[f.typ] > 1 {
// If there were multiple instances, add a second,
// so that the annihilation code will see a duplicate.
// It only cares about the distinction between 1 or 2,
// so don't bother generating any more copies.
fields = append(fields, fields[len(fields)-1])
}
continue
}
// Record new anonymous struct to explore in next round.
nextCount[ft]++
if nextCount[ft] == 1 {
f := field{name: ft.Name(), index: index, typ: ft}
next = append(next, f)
}
}
}
}
sort.Sort(byName(fields))
// Delete all fields that are hidden by the Go rules for embedded fields,
// except that fields with TOML tags are promoted.
// The fields are sorted in primary order of name, secondary order
// of field index length. Loop over names; for each name, delete
// hidden fields by choosing the one dominant field that survives.
out := fields[:0]
for advance, i := 0, 0; i < len(fields); i += advance {
// One iteration per name.
// Find the sequence of fields with the name of this first field.
fi := fields[i]
name := fi.name
for advance = 1; i+advance < len(fields); advance++ {
fj := fields[i+advance]
if fj.name != name {
break
}
}
if advance == 1 { // Only one field with this name
out = append(out, fi)
continue
}
dominant, ok := dominantField(fields[i : i+advance])
if ok {
out = append(out, dominant)
}
}
fields = out
sort.Sort(byIndex(fields))
return fields
}
// dominantField looks through the fields, all of which are known to
// have the same name, to find the single field that dominates the
// others using Go's embedding rules, modified by the presence of
// TOML tags. If there are multiple top-level fields, the boolean
// will be false: This condition is an error in Go and we skip all
// the fields.
func dominantField(fields []field) (field, bool) {
// The fields are sorted in increasing index-length order. The winner
// must therefore be one with the shortest index length. Drop all
// longer entries, which is easy: just truncate the slice.
length := len(fields[0].index)
tagged := -1 // Index of first tagged field.
for i, f := range fields {
if len(f.index) > length {
fields = fields[:i]
break
}
if f.tag {
if tagged >= 0 {
// Multiple tagged fields at the same level: conflict.
// Return no field.
return field{}, false
}
tagged = i
}
}
if tagged >= 0 {
return fields[tagged], true
}
// All remaining fields have the same length. If there's more than one,
// we have a conflict (two fields named "X" at the same level) and we
// return no field.
if len(fields) > 1 {
return field{}, false
}
return fields[0], true
}
var fieldCache struct {
sync.RWMutex
m map[reflect.Type][]field
}
// cachedTypeFields is like typeFields but uses a cache to avoid repeated work.
func cachedTypeFields(t reflect.Type) []field {
fieldCache.RLock()
f := fieldCache.m[t]
fieldCache.RUnlock()
if f != nil {
return f
}
// Compute fields without lock.
// Might duplicate effort but won't hold other computations back.
f = typeFields(t)
if f == nil {
f = []field{}
}
fieldCache.Lock()
if fieldCache.m == nil {
fieldCache.m = map[reflect.Type][]field{}
}
fieldCache.m[t] = f
fieldCache.Unlock()
return f
}
language: go
go:
- 1.0.3
- 1.1.2
- 1.2
- tip
install:
- go get github.com/bmizerany/assert
notifications:
email: false
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
### go-simplejson
a Go package to interact with arbitrary JSON
[![Build Status](https://secure.travis-ci.org/bitly/go-simplejson.png)](http://travis-ci.org/bitly/go-simplejson)
### Importing
import github.com/bitly/go-simplejson
### Documentation
Visit the docs on [gopkgdoc](http://godoc.org/github.com/bitly/go-simplejson)
// +build !go1.1
package simplejson
import (
"encoding/json"
"errors"
"io"
"reflect"
)
// NewFromReader returns a *Json by decoding from an io.Reader
func NewFromReader(r io.Reader) (*Json, error) {
j := new(Json)
dec := json.NewDecoder(r)
err := dec.Decode(&j.data)
return j, err
}
// Implements the json.Unmarshaler interface.
func (j *Json) UnmarshalJSON(p []byte) error {
return json.Unmarshal(p, &j.data)
}
// Float64 coerces into a float64
func (j *Json) Float64() (float64, error) {
switch j.data.(type) {
case float32, float64:
return reflect.ValueOf(j.data).Float(), nil
case int, int8, int16, int32, int64:
return float64(reflect.ValueOf(j.data).Int()), nil
case uint, uint8, uint16, uint32, uint64:
return float64(reflect.ValueOf(j.data).Uint()), nil
}
return 0, errors.New("invalid value type")
}
// Int coerces into an int
func (j *Json) Int() (int, error) {
switch j.data.(type) {
case float32, float64:
return int(reflect.ValueOf(j.data).Float()), nil
case int, int8, int16, int32, int64:
return int(reflect.ValueOf(j.data).Int()), nil
case uint, uint8, uint16, uint32, uint64:
return int(reflect.ValueOf(j.data).Uint()), nil
}
return 0, errors.New("invalid value type")
}
// Int64 coerces into an int64
func (j *Json) Int64() (int64, error) {
switch j.data.(type) {
case float32, float64:
return int64(reflect.ValueOf(j.data).Float()), nil
case int, int8, int16, int32, int64:
return reflect.ValueOf(j.data).Int(), nil
case uint, uint8, uint16, uint32, uint64:
return int64(reflect.ValueOf(j.data).Uint()), nil
}
return 0, errors.New("invalid value type")
}
// Uint64 coerces into an uint64
func (j *Json) Uint64() (uint64, error) {
switch j.data.(type) {
case float32, float64:
return uint64(reflect.ValueOf(j.data).Float()), nil
case int, int8, int16, int32, int64:
return uint64(reflect.ValueOf(j.data).Int()), nil
case uint, uint8, uint16, uint32, uint64:
return reflect.ValueOf(j.data).Uint(), nil
}
return 0, errors.New("invalid value type")
}
// +build go1.1
package simplejson
import (
"bytes"
"encoding/json"
"errors"
"io"
"reflect"
"strconv"
)
// Implements the json.Unmarshaler interface.
func (j *Json) UnmarshalJSON(p []byte) error {
dec := json.NewDecoder(bytes.NewBuffer(p))
dec.UseNumber()
return dec.Decode(&j.data)
}
// NewFromReader returns a *Json by decoding from an io.Reader
func NewFromReader(r io.Reader) (*Json, error) {
j := new(Json)
dec := json.NewDecoder(r)
dec.UseNumber()
err := dec.Decode(&j.data)
return j, err
}
// Float64 coerces into a float64
func (j *Json) Float64() (float64, error) {
switch j.data.(type) {
case json.Number:
return j.data.(json.Number).Float64()
case float32, float64:
return reflect.ValueOf(j.data).Float(), nil
case int, int8, int16, int32, int64:
return float64(reflect.ValueOf(j.data).Int()), nil
case uint, uint8, uint16, uint32, uint64:
return float64(reflect.ValueOf(j.data).Uint()), nil
}
return 0, errors.New("invalid value type")
}
// Int coerces into an int
func (j *Json) Int() (int, error) {
switch j.data.(type) {
case json.Number:
i, err := j.data.(json.Number).Int64()
return int(i), err
case float32, float64:
return int(reflect.ValueOf(j.data).Float()), nil
case int, int8, int16, int32, int64:
return int(reflect.ValueOf(j.data).Int()), nil
case uint, uint8, uint16, uint32, uint64:
return int(reflect.ValueOf(j.data).Uint()), nil
}
return 0, errors.New("invalid value type")
}
// Int64 coerces into an int64
func (j *Json) Int64() (int64, error) {
switch j.data.(type) {
case json.Number:
return j.data.(json.Number).Int64()
case float32, float64:
return int64(reflect.ValueOf(j.data).Float()), nil
case int, int8, int16, int32, int64:
return reflect.ValueOf(j.data).Int(), nil
case uint, uint8, uint16, uint32, uint64:
return int64(reflect.ValueOf(j.data).Uint()), nil
}
return 0, errors.New("invalid value type")
}
// Uint64 coerces into an uint64
func (j *Json) Uint64() (uint64, error) {
switch j.data.(type) {
case json.Number:
return strconv.ParseUint(j.data.(json.Number).String(), 10, 64)
case float32, float64:
return uint64(reflect.ValueOf(j.data).Float()), nil
case int, int8, int16, int32, int64:
return uint64(reflect.ValueOf(j.data).Int()), nil
case uint, uint8, uint16, uint32, uint64:
return reflect.ValueOf(j.data).Uint(), nil
}
return 0, errors.New("invalid value type")
}
CoreOS Project
Copyright 2014 CoreOS, Inc
This product includes software developed at CoreOS, Inc.
(http://www.coreos.com/).
syntax = "proto3";
package authpb;
import "gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
// User is a single entry in the bucket authUsers
message User {
bytes name = 1;
bytes password = 2;
repeated string roles = 3;
}
// Permission is a single entity
message Permission {
enum Type {
READ = 0;
WRITE = 1;
READWRITE = 2;
}
Type permType = 1;
bytes key = 2;
bytes range_end = 3;
}
// Role is a single entry in the bucket authRoles
message Role {
bytes name = 1;
repeated Permission keyPermission = 2;
}
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"context"
"fmt"
"strings"
"github.com/coreos/etcd/auth/authpb"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
type (
AuthEnableResponse pb.AuthEnableResponse
AuthDisableResponse pb.AuthDisableResponse
AuthenticateResponse pb.AuthenticateResponse
AuthUserAddResponse pb.AuthUserAddResponse
AuthUserDeleteResponse pb.AuthUserDeleteResponse
AuthUserChangePasswordResponse pb.AuthUserChangePasswordResponse
AuthUserGrantRoleResponse pb.AuthUserGrantRoleResponse
AuthUserGetResponse pb.AuthUserGetResponse
AuthUserRevokeRoleResponse pb.AuthUserRevokeRoleResponse
AuthRoleAddResponse pb.AuthRoleAddResponse
AuthRoleGrantPermissionResponse pb.AuthRoleGrantPermissionResponse
AuthRoleGetResponse pb.AuthRoleGetResponse
AuthRoleRevokePermissionResponse pb.AuthRoleRevokePermissionResponse
AuthRoleDeleteResponse pb.AuthRoleDeleteResponse
AuthUserListResponse pb.AuthUserListResponse
AuthRoleListResponse pb.AuthRoleListResponse
PermissionType authpb.Permission_Type
Permission authpb.Permission
)
const (
PermRead = authpb.READ
PermWrite = authpb.WRITE
PermReadWrite = authpb.READWRITE
)
type Auth interface {
// AuthEnable enables auth of an etcd cluster.
AuthEnable(ctx context.Context) (*AuthEnableResponse, error)
// AuthDisable disables auth of an etcd cluster.
AuthDisable(ctx context.Context) (*AuthDisableResponse, error)
// UserAdd adds a new user to an etcd cluster.
UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error)
// UserDelete deletes a user from an etcd cluster.
UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error)
// UserChangePassword changes a password of a user.
UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error)
// UserGrantRole grants a role to a user.
UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error)
// UserGet gets a detailed information of a user.
UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error)
// UserList gets a list of all users.
UserList(ctx context.Context) (*AuthUserListResponse, error)
// UserRevokeRole revokes a role of a user.
UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error)
// RoleAdd adds a new role to an etcd cluster.
RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error)
// RoleGrantPermission grants a permission to a role.
RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error)
// RoleGet gets a detailed information of a role.
RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error)
// RoleList gets a list of all roles.
RoleList(ctx context.Context) (*AuthRoleListResponse, error)
// RoleRevokePermission revokes a permission from a role.
RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error)
// RoleDelete deletes a role.
RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error)
}
type auth struct {
remote pb.AuthClient
callOpts []grpc.CallOption
}
func NewAuth(c *Client) Auth {
api := &auth{remote: RetryAuthClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
return (*AuthEnableResponse)(resp), toErr(ctx, err)
}
func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...)
return (*AuthDisableResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...)
return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...)
return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...)
return (*AuthUserGetResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...)
return (*AuthUserListResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...)
return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...)
return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) {
perm := &authpb.Permission{
Key: []byte(key),
RangeEnd: []byte(rangeEnd),
PermType: authpb.Permission_Type(permType),
}
resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...)
return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...)
return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...)
return (*AuthRoleListResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, auth.callOpts...)
return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...)
return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
}
func StrToPermissionType(s string) (PermissionType, error) {
val, ok := authpb.Permission_Type_value[strings.ToUpper(s)]
if ok {
return PermissionType(val), nil
}
return PermissionType(-1), fmt.Errorf("invalid permission type: %s", s)
}
type authenticator struct {
conn *grpc.ClientConn // conn in-use
remote pb.AuthClient
callOpts []grpc.CallOption
}
func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthenticateResponse)(resp), toErr(ctx, err)
}
func (auth *authenticator) close() {
auth.conn.Close()
}
func newAuthenticator(ctx context.Context, target string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
conn, err := grpc.DialContext(ctx, target, opts...)
if err != nil {
return nil, err
}
api := &authenticator{
conn: conn,
remote: pb.NewAuthClient(conn),
}
if c != nil {
api.callOpts = c.callOpts
}
return api, nil
}
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package balancer implements client balancer.
package balancer
import (
"strconv"
"sync"
"time"
"github.com/coreos/etcd/clientv3/balancer/connectivity"
"github.com/coreos/etcd/clientv3/balancer/picker"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
grpcconnectivity "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)
// Config defines balancer configurations.
type Config struct {
// Policy configures balancer policy.
Policy picker.Policy
// Picker implements gRPC picker.
// Leave empty if "Policy" field is not custom.
// TODO: currently custom policy is not supported.
// Picker picker.Picker
// Name defines an additional name for balancer.
// Useful for balancer testing to avoid register conflicts.
// If empty, defaults to policy name.
Name string
// Logger configures balancer logging.
// If nil, logs are discarded.
Logger *zap.Logger
}
// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
bb := &builder{cfg}
balancer.Register(bb)
bb.cfg.Logger.Debug(
"registered balancer",
zap.String("policy", bb.cfg.Policy.String()),
zap.String("name", bb.cfg.Name),
)
}
type builder struct {
cfg Config
}
// Build is called initially when creating "ccBalancerWrapper".
// "grpc.Dial" is called to this client connection.
// Then, resolved addresses will be handled via "HandleResolvedAddrs".
func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bb := &baseBalancer{
id: strconv.FormatInt(time.Now().UnixNano(), 36),
policy: b.cfg.Policy,
name: b.cfg.Name,
lg: b.cfg.Logger,
addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
currentConn: nil,
connectivityRecorder: connectivity.New(b.cfg.Logger),
// initialize picker always returns "ErrNoSubConnAvailable"
picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
// TODO: support multiple connections
bb.mu.Lock()
bb.currentConn = cc
bb.mu.Unlock()
bb.lg.Info(
"built balancer",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.String("resolver-target", cc.Target()),
)
return bb
}
// Name implements "grpc/balancer.Builder" interface.
func (b *builder) Name() string { return b.cfg.Name }
// Balancer defines client balancer interface.
type Balancer interface {
// Balancer is called on specified client connection. Client initiates gRPC
// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
// changes, thus requires failover logic in this method.
balancer.Balancer
// Picker calls "Pick" for every client request.
picker.Picker
}
type baseBalancer struct {
id string
policy picker.Policy
name string
lg *zap.Logger
mu sync.RWMutex
addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
scToSt map[balancer.SubConn]grpcconnectivity.State
currentConn balancer.ClientConn
connectivityRecorder connectivity.Recorder
picker picker.Picker
}
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
bb.lg.Info("resolved",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.Strings("addresses", addrsToStrings(addrs)),
)
bb.mu.Lock()
defer bb.mu.Unlock()
resolved := make(map[resolver.Address]struct{})
for _, addr := range addrs {
resolved[addr] = struct{}{}
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
continue
}
bb.lg.Info("created subconn", zap.String("address", addr.Addr))
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
bb.scToSt[sc] = grpcconnectivity.Idle
sc.Connect()
}
}
for addr, sc := range bb.addrToSc {
if _, ok := resolved[addr]; !ok {
// was removed by resolver or failed to create subconn
bb.currentConn.RemoveSubConn(sc)
delete(bb.addrToSc, addr)
bb.lg.Info(
"removed subconn",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
)
// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
// (DO NOT) delete(bb.scToAddr, sc)
// (DO NOT) delete(bb.scToSt, sc)
}
}
}
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()
old, ok := bb.scToSt[sc]
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("state", s.String()),
)
return
}
bb.lg.Info(
"state changed",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.Bool("connected", s == grpcconnectivity.Ready),
zap.String("subconn", scToString(sc)),
zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
)
bb.scToSt[sc] = s
switch s {
case grpcconnectivity.Idle:
sc.Connect()
case grpcconnectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}
oldAggrState := bb.connectivityRecorder.GetCurrentState()
bb.connectivityRecorder.RecordTransition(old, s)
// Update balancer picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
(bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
bb.updatePicker()
}
bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
}
func (bb *baseBalancer) updatePicker() {
if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
bb.picker = picker.NewErr(balancer.ErrTransientFailure)
bb.lg.Info(
"updated picker to transient error picker",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
return
}
// only pass ready subconns to picker
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
scToAddr[sc] = addr
}
}
bb.picker = picker.New(picker.Config{
Policy: bb.policy,
Logger: bb.lg,
SubConnToResolverAddress: scToAddr,
})
bb.lg.Info(
"updated picker",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.Strings("subconn-ready", scsToStrings(scToAddr)),
zap.Int("subconn-size", len(scToAddr)),
)
}
// Close implements "grpc/balancer.Balancer" interface.
// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
func (bb *baseBalancer) Close() {
// TODO
}
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package connectivity implements client connectivity operations.
package connectivity
import (
"sync"
"go.uber.org/zap"
"google.golang.org/grpc/connectivity"
)
// Recorder records gRPC connectivity.
type Recorder interface {
GetCurrentState() connectivity.State
RecordTransition(oldState, newState connectivity.State)
}
// New returns a new Recorder.
func New(lg *zap.Logger) Recorder {
return &recorder{lg: lg}
}
// recorder takes the connectivity states of multiple SubConns
// and returns one aggregated connectivity state.
// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
type recorder struct {
lg *zap.Logger
mu sync.RWMutex
cur connectivity.State
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
}
func (rc *recorder) GetCurrentState() (state connectivity.State) {
rc.mu.RLock()
defer rc.mu.RUnlock()
return rc.cur
}
// RecordTransition records state change happening in subConn and based on that
// it evaluates what aggregated state should be.
//
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else the aggregated state is TransientFailure.
//
// Idle and Shutdown are not considered.
//
// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
func (rc *recorder) RecordTransition(oldState, newState connectivity.State) {
rc.mu.Lock()
defer rc.mu.Unlock()
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
rc.numReady += updateVal
case connectivity.Connecting:
rc.numConnecting += updateVal
case connectivity.TransientFailure:
rc.numTransientFailure += updateVal
default:
rc.lg.Warn("connectivity recorder received unknown state", zap.String("connectivity-state", state.String()))
}
}
switch { // must be exclusive, no overlap
case rc.numReady > 0:
rc.cur = connectivity.Ready
case rc.numConnecting > 0:
rc.cur = connectivity.Connecting
default:
rc.cur = connectivity.TransientFailure
}
}
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package picker defines/implements client balancer picker policy.
package picker
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"context"
"google.golang.org/grpc/balancer"
)
// NewErr returns a picker that always returns err on "Pick".
func NewErr(err error) Picker {
return &errPicker{p: Error, err: err}
}
type errPicker struct {
p Policy
err error
}
func (ep *errPicker) String() string {
return ep.p.String()
}
func (ep *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, ep.err
}
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"fmt"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// Picker defines balancer Picker methods.
type Picker interface {
balancer.Picker
String() string
}
// Config defines picker configuration.
type Config struct {
// Policy specifies etcd clientv3's built in balancer policy.
Policy Policy
// Logger defines picker logging object.
Logger *zap.Logger
// SubConnToResolverAddress maps each gRPC sub-connection to an address.
// Basically, it is a list of addresses that the Picker can pick from.
SubConnToResolverAddress map[balancer.SubConn]resolver.Address
}
// Policy defines balancer picker policy.
type Policy uint8
const (
// Error is error picker policy.
Error Policy = iota
// RoundrobinBalanced balances loads over multiple endpoints
// and implements failover in roundrobin fashion.
RoundrobinBalanced
// Custom defines custom balancer picker.
// TODO: custom picker is not supported yet.
Custom
)
func (p Policy) String() string {
switch p {
case Error:
return "picker-error"
case RoundrobinBalanced:
return "picker-roundrobin-balanced"
case Custom:
panic("'custom' picker policy is not supported yet")
default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", p))
}
}
// New creates a new Picker.
func New(cfg Config) Picker {
switch cfg.Policy {
case Error:
panic("'error' picker policy is not supported here; use 'picker.NewErr'")
case RoundrobinBalanced:
return newRoundrobinBalanced(cfg)
case Custom:
panic("'custom' picker policy is not supported yet")
default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", cfg.Policy))
}
}
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"context"
"sync"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// newRoundrobinBalanced returns a new roundrobin balanced picker.
func newRoundrobinBalanced(cfg Config) Picker {
scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress))
for sc := range cfg.SubConnToResolverAddress {
scs = append(scs, sc)
}
return &rrBalanced{
p: RoundrobinBalanced,
lg: cfg.Logger,
scs: scs,
scToAddr: cfg.SubConnToResolverAddress,
}
}
type rrBalanced struct {
p Policy
lg *zap.Logger
mu sync.RWMutex
next int
scs []balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
}
func (rb *rrBalanced) String() string { return rb.p.String() }
// Pick is called for every client request.
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
rb.mu.RLock()
n := len(rb.scs)
rb.mu.RUnlock()
if n == 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
rb.mu.Lock()
cur := rb.next
sc := rb.scs[cur]
picked := rb.scToAddr[sc].Addr
rb.next = (rb.next + 1) % len(rb.scs)
rb.mu.Unlock()
rb.lg.Debug(
"picked",
zap.String("picker", rb.p.String()),
zap.String("address", picked),
zap.Int("subconn-index", cur),
zap.Int("subconn-size", n),
)
doneFunc := func(info balancer.DoneInfo) {
// TODO: error handling?
fss := []zapcore.Field{
zap.Error(info.Err),
zap.String("picker", rb.p.String()),
zap.String("address", picked),
zap.Bool("success", info.Err == nil),
zap.Bool("bytes-sent", info.BytesSent),
zap.Bool("bytes-received", info.BytesReceived),
}
if info.Err == nil {
rb.lg.Debug("balancer done", fss...)
} else {
rb.lg.Warn("balancer failed", fss...)
}
}
return sc, doneFunc, nil
}
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint://<id>/<endpoint>'.
package endpoint
import (
"context"
"fmt"
"net"
"net/url"
"strings"
"sync"
"google.golang.org/grpc/resolver"
)
const scheme = "endpoint"
var (
targetPrefix = fmt.Sprintf("%s://", scheme)
bldr *builder
)
func init() {
bldr = &builder{
resolverGroups: make(map[string]*ResolverGroup),
}
resolver.Register(bldr)
}
type builder struct {
mu sync.RWMutex
resolverGroups map[string]*ResolverGroup
}
// NewResolverGroup creates a new ResolverGroup with the given id.
func NewResolverGroup(id string) (*ResolverGroup, error) {
return bldr.newResolverGroup(id)
}
// ResolverGroup keeps all endpoints of resolvers using a common endpoint://<id>/ target
// up-to-date.
type ResolverGroup struct {
mu sync.RWMutex
id string
endpoints []string
resolvers []*Resolver
}
func (e *ResolverGroup) addResolver(r *Resolver) {
e.mu.Lock()
addrs := epsToAddrs(e.endpoints...)
e.resolvers = append(e.resolvers, r)
e.mu.Unlock()
r.cc.NewAddress(addrs)
}
func (e *ResolverGroup) removeResolver(r *Resolver) {
e.mu.Lock()
for i, er := range e.resolvers {
if er == r {
e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...)
break
}
}
e.mu.Unlock()
}
// SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated
// immediately with the new endpoints.
func (e *ResolverGroup) SetEndpoints(endpoints []string) {
addrs := epsToAddrs(endpoints...)
e.mu.Lock()
e.endpoints = endpoints
for _, r := range e.resolvers {
r.cc.NewAddress(addrs)
}
e.mu.Unlock()
}
// Target constructs a endpoint target using the endpoint id of the ResolverGroup.
func (e *ResolverGroup) Target(endpoint string) string {
return Target(e.id, endpoint)
}
// Target constructs a endpoint resolver target.
func Target(id, endpoint string) string {
return fmt.Sprintf("%s://%s/%s", scheme, id, endpoint)
}
// IsTarget checks if a given target string in an endpoint resolver target.
func IsTarget(target string) bool {
return strings.HasPrefix(target, "endpoint://")
}
func (e *ResolverGroup) Close() {
bldr.close(e.id)
}
// Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target.
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
if len(target.Authority) < 1 {
return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to")
}
id := target.Authority
es, err := b.getResolverGroup(id)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
r := &Resolver{
endpointID: id,
cc: cc,
}
es.addResolver(r)
return r, nil
}
func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) {
b.mu.RLock()
_, ok := b.resolverGroups[id]
b.mu.RUnlock()
if ok {
return nil, fmt.Errorf("Endpoint already exists for id: %s", id)
}
es := &ResolverGroup{id: id}
b.mu.Lock()
b.resolverGroups[id] = es
b.mu.Unlock()
return es, nil
}
func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) {
b.mu.RLock()
es, ok := b.resolverGroups[id]
b.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("ResolverGroup not found for id: %s", id)
}
return es, nil
}
func (b *builder) close(id string) {
b.mu.Lock()
delete(b.resolverGroups, id)
b.mu.Unlock()
}
func (b *builder) Scheme() string {
return scheme
}
// Resolver provides a resolver for a single etcd cluster, identified by name.
type Resolver struct {
endpointID string
cc resolver.ClientConn
sync.RWMutex
}
// TODO: use balancer.epsToAddrs
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
addrs = make([]resolver.Address, 0, len(eps))
for _, ep := range eps {
addrs = append(addrs, resolver.Address{Addr: ep})
}
return addrs
}
func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {}
func (r *Resolver) Close() {
es, err := bldr.getResolverGroup(r.endpointID)
if err != nil {
return
}
es.removeResolver(r)
}
// ParseEndpoint endpoint parses an endpoint of the form
// (http|https)://<host>*|(unix|unixs)://<path>)
// and returns a protocol ('tcp' or 'unix'),
// host (or filepath if a unix socket),
// scheme (http, https, unix, unixs).
func ParseEndpoint(endpoint string) (proto string, host string, scheme string) {
proto = "tcp"
host = endpoint
url, uerr := url.Parse(endpoint)
if uerr != nil || !strings.Contains(endpoint, "://") {
return proto, host, scheme
}
scheme = url.Scheme
// strip scheme:// prefix since grpc dials by host
host = url.Host
switch url.Scheme {
case "http", "https":
case "unix", "unixs":
proto = "unix"
host = url.Host + url.Path
default:
proto, host = "", ""
}
return proto, host, scheme
}
// ParseTarget parses a endpoint://<id>/<endpoint> string and returns the parsed id and endpoint.
// If the target is malformed, an error is returned.
func ParseTarget(target string) (string, string, error) {
noPrefix := strings.TrimPrefix(target, targetPrefix)
if noPrefix == target {
return "", "", fmt.Errorf("malformed target, %s prefix is required: %s", targetPrefix, target)
}
parts := strings.SplitN(noPrefix, "/", 2)
if len(parts) != 2 {
return "", "", fmt.Errorf("malformed target, expected %s://<id>/<endpoint>, but got %s", scheme, target)
}
return parts[0], parts[1], nil
}
// Dialer dials a endpoint using net.Dialer.
// Context cancelation and timeout are supported.
func Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
proto, host, _ := ParseEndpoint(dialEp)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
dialer := &net.Dialer{}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
}
return dialer.DialContext(ctx, proto, host)
}
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package balancer
import (
"fmt"
"net/url"
"sort"
"sync/atomic"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
func scToString(sc balancer.SubConn) string {
return fmt.Sprintf("%p", sc)
}
func scsToStrings(scs map[balancer.SubConn]resolver.Address) (ss []string) {
ss = make([]string, 0, len(scs))
for sc, a := range scs {
ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc)))
}
sort.Strings(ss)
return ss
}
func addrsToStrings(addrs []resolver.Address) (ss []string) {
ss = make([]string, len(addrs))
for i := range addrs {
ss[i] = addrs[i].Addr
}
sort.Strings(ss)
return ss
}
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
addrs = make([]resolver.Address, 0, len(eps))
for _, ep := range eps {
u, err := url.Parse(ep)
if err != nil {
addrs = append(addrs, resolver.Address{Addr: ep, Type: resolver.Backend})
continue
}
addrs = append(addrs, resolver.Address{Addr: u.Host, Type: resolver.Backend})
}
return addrs
}
var genN = new(uint32)
func genName() string {
now := time.Now().UnixNano()
return fmt.Sprintf("%X%X", now, atomic.AddUint32(genN, 1))
}
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"context"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/types"
"google.golang.org/grpc"
)
type (
Member pb.Member
MemberListResponse pb.MemberListResponse
MemberAddResponse pb.MemberAddResponse
MemberRemoveResponse pb.MemberRemoveResponse
MemberUpdateResponse pb.MemberUpdateResponse
)
type Cluster interface {
// MemberList lists the current cluster membership.
MemberList(ctx context.Context) (*MemberListResponse, error)
// MemberAdd adds a new member into the cluster.
MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
// MemberRemove removes an existing member from the cluster.
MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)
// MemberUpdate updates the peer addresses of the member.
MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error)
}
type cluster struct {
remote pb.ClusterClient
callOpts []grpc.CallOption
}
func NewCluster(c *Client) Cluster {
api := &cluster{remote: RetryClusterClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
api := &cluster{remote: remote}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
// fail-fast before panic in rafthttp
if _, err := types.NewURLs(peerAddrs); err != nil {
return nil, err
}
r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
return (*MemberAddResponse)(resp), nil
}
func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
r := &pb.MemberRemoveRequest{ID: id}
resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
return (*MemberRemoveResponse)(resp), nil
}
func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
// fail-fast before panic in rafthttp
if _, err := types.NewURLs(peerAddrs); err != nil {
return nil, err
}
// it is safe to retry on update.
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
resp, err := c.remote.MemberUpdate(ctx, r, c.callOpts...)
if err == nil {
return (*MemberUpdateResponse)(resp), nil
}
return nil, toErr(ctx, err)
}
func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
// it is safe to retry on list.
resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, c.callOpts...)
if err == nil {
return (*MemberListResponse)(resp), nil
}
return nil, toErr(ctx, err)
}
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
// CompactOp represents a compact operation.
type CompactOp struct {
revision int64
physical bool
}
// CompactOption configures compact operation.
type CompactOption func(*CompactOp)
func (op *CompactOp) applyCompactOpts(opts []CompactOption) {
for _, opt := range opts {
opt(op)
}
}
// OpCompact wraps slice CompactOption to create a CompactOp.
func OpCompact(rev int64, opts ...CompactOption) CompactOp {
ret := CompactOp{revision: rev}
ret.applyCompactOpts(opts)
return ret
}
func (op CompactOp) toRequest() *pb.CompactionRequest {
return &pb.CompactionRequest{Revision: op.revision, Physical: op.physical}
}
// WithCompactPhysical makes Compact wait until all compacted entries are
// removed from the etcd server's storage.
func WithCompactPhysical() CompactOption {
return func(op *CompactOp) { op.physical = true }
}
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
type CompareTarget int
type CompareResult int
const (
CompareVersion CompareTarget = iota
CompareCreated
CompareModified
CompareValue
)
type Cmp pb.Compare
func Compare(cmp Cmp, result string, v interface{}) Cmp {
var r pb.Compare_CompareResult
switch result {
case "=":
r = pb.Compare_EQUAL
case "!=":
r = pb.Compare_NOT_EQUAL
case ">":
r = pb.Compare_GREATER
case "<":
r = pb.Compare_LESS
default:
panic("Unknown result op")
}
cmp.Result = r
switch cmp.Target {
case pb.Compare_VALUE:
val, ok := v.(string)
if !ok {
panic("bad compare value")
}
cmp.TargetUnion = &pb.Compare_Value{Value: []byte(val)}
case pb.Compare_VERSION:
cmp.TargetUnion = &pb.Compare_Version{Version: mustInt64(v)}
case pb.Compare_CREATE:
cmp.TargetUnion = &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)}
case pb.Compare_MOD:
cmp.TargetUnion = &pb.Compare_ModRevision{ModRevision: mustInt64(v)}
case pb.Compare_LEASE:
cmp.TargetUnion = &pb.Compare_Lease{Lease: mustInt64orLeaseID(v)}
default:
panic("Unknown compare type")
}
return cmp
}
func Value(key string) Cmp {
return Cmp{Key: []byte(key), Target: pb.Compare_VALUE}
}
func Version(key string) Cmp {
return Cmp{Key: []byte(key), Target: pb.Compare_VERSION}
}
func CreateRevision(key string) Cmp {
return Cmp{Key: []byte(key), Target: pb.Compare_CREATE}
}
func ModRevision(key string) Cmp {
return Cmp{Key: []byte(key), Target: pb.Compare_MOD}
}
// LeaseValue compares a key's LeaseID to a value of your choosing. The empty
// LeaseID is 0, otherwise known as `NoLease`.
func LeaseValue(key string) Cmp {
return Cmp{Key: []byte(key), Target: pb.Compare_LEASE}
}
// KeyBytes returns the byte slice holding with the comparison key.
func (cmp *Cmp) KeyBytes() []byte { return cmp.Key }
// WithKeyBytes sets the byte slice for the comparison key.
func (cmp *Cmp) WithKeyBytes(key []byte) { cmp.Key = key }
// ValueBytes returns the byte slice holding the comparison value, if any.
func (cmp *Cmp) ValueBytes() []byte {
if tu, ok := cmp.TargetUnion.(*pb.Compare_Value); ok {
return tu.Value
}
return nil
}
// WithValueBytes sets the byte slice for the comparison's value.
func (cmp *Cmp) WithValueBytes(v []byte) { cmp.TargetUnion.(*pb.Compare_Value).Value = v }
// WithRange sets the comparison to scan the range [key, end).
func (cmp Cmp) WithRange(end string) Cmp {
cmp.RangeEnd = []byte(end)
return cmp
}
// WithPrefix sets the comparison to scan all keys prefixed by the key.
func (cmp Cmp) WithPrefix() Cmp {
cmp.RangeEnd = getPrefix(cmp.Key)
return cmp
}
// mustInt64 panics if val isn't an int or int64. It returns an int64 otherwise.
func mustInt64(val interface{}) int64 {
if v, ok := val.(int64); ok {
return v
}
if v, ok := val.(int); ok {
return int64(v)
}
panic("bad value")
}
// mustInt64orLeaseID panics if val isn't a LeaseID, int or int64. It returns an
// int64 otherwise.
func mustInt64orLeaseID(val interface{}) int64 {
if v, ok := val.(LeaseID); ok {
return int64(v)
}
return mustInt64(val)
}
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"context"
"crypto/tls"
"time"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type Config struct {
// Endpoints is a list of URLs.
Endpoints []string `json:"endpoints"`
// AutoSyncInterval is the interval to update endpoints with its latest members.
// 0 disables auto-sync. By default auto-sync is disabled.
AutoSyncInterval time.Duration `json:"auto-sync-interval"`
// DialTimeout is the timeout for failing to establish a connection.
DialTimeout time.Duration `json:"dial-timeout"`
// DialKeepAliveTime is the time after which client pings the server to see if
// transport is alive.
DialKeepAliveTime time.Duration `json:"dial-keep-alive-time"`
// DialKeepAliveTimeout is the time that the client waits for a response for the
// keep-alive probe. If the response is not received in this time, the connection is closed.
DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"`
// MaxCallSendMsgSize is the client-side request send limit in bytes.
// If 0, it defaults to 2.0 MiB (2 * 1024 * 1024).
// Make sure that "MaxCallSendMsgSize" < server-side default send/recv limit.
// ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
MaxCallSendMsgSize int
// MaxCallRecvMsgSize is the client-side response receive limit.
// If 0, it defaults to "math.MaxInt32", because range response can
// easily exceed request send limits.
// Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit.
// ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
MaxCallRecvMsgSize int
// TLS holds the client secure credentials, if any.
TLS *tls.Config
// Username is a user name for authentication.
Username string `json:"username"`
// Password is a password for authentication.
Password string `json:"password"`
// RejectOldCluster when set will refuse to create a client against an outdated cluster.
RejectOldCluster bool `json:"reject-old-cluster"`
// DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
// For example, pass "grpc.WithBlock()" to block until the underlying connection is up.
// Without this, Dial returns immediately and connecting the server happens in background.
DialOptions []grpc.DialOption
// LogConfig configures client-side logger.
// If nil, use the default logger.
// TODO: configure gRPC logger
LogConfig *zap.Config
// Context is the default client context; it can be used to cancel grpc dial out and
// other operations that do not have an explicit context.
Context context.Context
// PermitWithoutStream when set will allow client to send keepalive pings to server without any active streams(RPCs).
PermitWithoutStream bool `json:"permit-without-stream"`
}
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package credentials implements gRPC credential interface with etcd specific logic.
// e.g., client handshake with custom authority parameter
package credentials
import (
"context"
"crypto/tls"
"net"
"sync"
"github.com/coreos/etcd/clientv3/balancer/resolver/endpoint"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
grpccredentials "google.golang.org/grpc/credentials"
)
// Config defines gRPC credential configuration.
type Config struct {
TLSConfig *tls.Config
}
// Bundle defines gRPC credential interface.
type Bundle interface {
grpccredentials.Bundle
UpdateAuthToken(token string)
}
// NewBundle constructs a new gRPC credential bundle.
func NewBundle(cfg Config) Bundle {
return &bundle{
tc: newTransportCredential(cfg.TLSConfig),
rc: newPerRPCCredential(),
}
}
// bundle implements "grpccredentials.Bundle" interface.
type bundle struct {
tc *transportCredential
rc *perRPCCredential
}
func (b *bundle) TransportCredentials() grpccredentials.TransportCredentials {
return b.tc
}
func (b *bundle) PerRPCCredentials() grpccredentials.PerRPCCredentials {
return b.rc
}
func (b *bundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
// no-op
return nil, nil
}
// transportCredential implements "grpccredentials.TransportCredentials" interface.
// transportCredential wraps TransportCredentials to track which
// addresses are dialed for which endpoints, and then sets the authority when checking the endpoint's cert to the
// hostname or IP of the dialed endpoint.
// This is a workaround of a gRPC load balancer issue. gRPC uses the dialed target's service name as the authority when
// checking all endpoint certs, which does not work for etcd servers using their hostname or IP as the Subject Alternative Name
// in their TLS certs.
// To enable, include both WithTransportCredentials(creds) and WithContextDialer(creds.Dialer)
// when dialing.
type transportCredential struct {
gtc grpccredentials.TransportCredentials
mu sync.Mutex
// addrToEndpoint maps from the connection addresses that are dialed to the hostname or IP of the
// endpoint provided to the dialer when dialing
addrToEndpoint map[string]string
}
func newTransportCredential(cfg *tls.Config) *transportCredential {
return &transportCredential{
gtc: grpccredentials.NewTLS(cfg),
addrToEndpoint: map[string]string{},
}
}
func (tc *transportCredential) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
// Set the authority when checking the endpoint's cert to the hostname or IP of the dialed endpoint
tc.mu.Lock()
dialEp, ok := tc.addrToEndpoint[rawConn.RemoteAddr().String()]
tc.mu.Unlock()
if ok {
_, host, _ := endpoint.ParseEndpoint(dialEp)
authority = host
}
return tc.gtc.ClientHandshake(ctx, authority, rawConn)
}
// return true if given string is an IP.
func isIP(ep string) bool {
return net.ParseIP(ep) != nil
}
func (tc *transportCredential) ServerHandshake(rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
return tc.gtc.ServerHandshake(rawConn)
}
func (tc *transportCredential) Info() grpccredentials.ProtocolInfo {
return tc.gtc.Info()
}
func (tc *transportCredential) Clone() grpccredentials.TransportCredentials {
copy := map[string]string{}
tc.mu.Lock()
for k, v := range tc.addrToEndpoint {
copy[k] = v
}
tc.mu.Unlock()
return &transportCredential{
gtc: tc.gtc.Clone(),
addrToEndpoint: copy,
}
}
func (tc *transportCredential) OverrideServerName(serverNameOverride string) error {
return tc.gtc.OverrideServerName(serverNameOverride)
}
func (tc *transportCredential) Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
// Keep track of which addresses are dialed for which endpoints
conn, err := endpoint.Dialer(ctx, dialEp)
if conn != nil {
tc.mu.Lock()
tc.addrToEndpoint[conn.RemoteAddr().String()] = dialEp
tc.mu.Unlock()
}
return conn, err
}
// perRPCCredential implements "grpccredentials.PerRPCCredentials" interface.
type perRPCCredential struct {
authToken string
authTokenMu sync.RWMutex
}
func newPerRPCCredential() *perRPCCredential { return &perRPCCredential{} }
func (rc *perRPCCredential) RequireTransportSecurity() bool { return false }
func (rc *perRPCCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
rc.authTokenMu.RLock()
authToken := rc.authToken
rc.authTokenMu.RUnlock()
return map[string]string{rpctypes.TokenFieldNameGRPC: authToken}, nil
}
func (b *bundle) UpdateAuthToken(token string) {
if b.rc == nil {
return
}
b.rc.UpdateAuthToken(token)
}
func (rc *perRPCCredential) UpdateAuthToken(token string) {
rc.authTokenMu.Lock()
rc.authToken = token
rc.authTokenMu.Unlock()
}
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package clientv3 implements the official Go etcd client for v3.
//
// Create client using `clientv3.New`:
//
// // expect dial time-out on ipv4 blackhole
// _, err := clientv3.New(clientv3.Config{
// Endpoints: []string{"http://254.0.0.1:12345"},
// DialTimeout: 2 * time.Second
// })
//
// // etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3
// if err == context.DeadlineExceeded {
// // handle errors
// }
//
// // etcd clientv3 <= v3.2.9, grpc/grpc-go <= v1.2.1
// if err == grpc.ErrClientConnTimeout {
// // handle errors
// }
//
// cli, err := clientv3.New(clientv3.Config{
// Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
// DialTimeout: 5 * time.Second,
// })
// if err != nil {
// // handle error!
// }
// defer cli.Close()
//
// Make sure to close the client after using it. If the client is not closed, the
// connection will have leaky goroutines.
//
// To specify a client request timeout, wrap the context with context.WithTimeout:
//
// ctx, cancel := context.WithTimeout(context.Background(), timeout)
// resp, err := kvc.Put(ctx, "sample_key", "sample_value")
// cancel()
// if err != nil {
// // handle error!
// }
// // use the response
//
// The Client has internal state (watchers and leases), so Clients should be reused instead of created as needed.
// Clients are safe for concurrent use by multiple goroutines.
//
// etcd client returns 3 types of errors:
//
// 1. context error: canceled or deadline exceeded.
// 2. gRPC status error: e.g. when clock drifts in server-side before client's context deadline exceeded.
// 3. gRPC error: see https://github.com/coreos/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go
//
// Here is the example code to handle client errors:
//
// resp, err := kvc.Put(ctx, "", "")
// if err != nil {
// if err == context.Canceled {
// // ctx is canceled by another routine
// } else if err == context.DeadlineExceeded {
// // ctx is attached with a deadline and it exceeded
// } else if ev, ok := status.FromError(err); ok {
// code := ev.Code()
// if code == codes.DeadlineExceeded {
// // server-side context might have timed-out first (due to clock skew)
// // while original client-side context is not timed-out yet
// }
// } else if verr, ok := err.(*v3rpc.ErrEmptyKey); ok {
// // process (verr.Errors)
// } else {
// // bad cluster endpoints, which are not etcd servers
// }
// }
//
// go func() { cli.Close() }()
// _, err := kvc.Get(ctx, "a")
// if err != nil {
// if err == context.Canceled {
// // grpc balancer calls 'Get' with an inflight client.Close
// } else if err == grpc.ErrClientConnClosing {
// // grpc balancer calls 'Get' after client.Close.
// }
// }
//
package clientv3
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"context"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)
type KV interface {
// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn creates a transaction.
Txn(ctx context.Context) Txn
}
type OpResponse struct {
put *PutResponse
get *GetResponse
del *DeleteResponse
txn *TxnResponse
}
func (op OpResponse) Put() *PutResponse { return op.put }
func (op OpResponse) Get() *GetResponse { return op.get }
func (op OpResponse) Del() *DeleteResponse { return op.del }
func (op OpResponse) Txn() *TxnResponse { return op.txn }
func (resp *PutResponse) OpResponse() OpResponse {
return OpResponse{put: resp}
}
func (resp *GetResponse) OpResponse() OpResponse {
return OpResponse{get: resp}
}
func (resp *DeleteResponse) OpResponse() OpResponse {
return OpResponse{del: resp}
}
func (resp *TxnResponse) OpResponse() OpResponse {
return OpResponse{txn: resp}
}
type kv struct {
remote pb.KVClient
callOpts []grpc.CallOption
}
func NewKV(c *Client) KV {
api := &kv{remote: RetryKVClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {
api := &kv{remote: remote}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, toErr(ctx, err)
}
func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
r, err := kv.Do(ctx, OpGet(key, opts...))
return r.get, toErr(ctx, err)
}
func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) {
r, err := kv.Do(ctx, OpDelete(key, opts...))
return r.del, toErr(ctx, err)
}
func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
return (*CompactResponse)(resp), err
}
func (kv *kv) Txn(ctx context.Context) Txn {
return &txn{
kv: kv,
ctx: ctx,
callOpts: kv.callOpts,
}
}
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t {
case tRange:
var resp *pb.RangeResponse
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil
}
case tTxn:
var resp *pb.TxnResponse
resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...)
if err == nil {
return OpResponse{txn: (*TxnResponse)(resp)}, nil
}
default:
panic("Unknown op")
}
return OpResponse{}, toErr(ctx, err)
}
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"io/ioutil"
"sync"
"github.com/coreos/etcd/pkg/logutil"
"google.golang.org/grpc/grpclog"
)
var (
lgMu sync.RWMutex
lg logutil.Logger
)
type settableLogger struct {
l grpclog.LoggerV2
mu sync.RWMutex
}
func init() {
// disable client side logs by default
lg = &settableLogger{}
SetLogger(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
}
// SetLogger sets client-side Logger.
func SetLogger(l grpclog.LoggerV2) {
lgMu.Lock()
lg = logutil.NewLogger(l)
// override grpclog so that any changes happen with locking
grpclog.SetLoggerV2(lg)
lgMu.Unlock()
}
// GetLogger returns the current logutil.Logger.
func GetLogger() logutil.Logger {
lgMu.RLock()
l := lg
lgMu.RUnlock()
return l
}
// NewLogger returns a new Logger with logutil.Logger.
func NewLogger(gl grpclog.LoggerV2) logutil.Logger {
return &settableLogger{l: gl}
}
func (s *settableLogger) get() grpclog.LoggerV2 {
s.mu.RLock()
l := s.l
s.mu.RUnlock()
return l
}
// implement the grpclog.LoggerV2 interface
func (s *settableLogger) Info(args ...interface{}) { s.get().Info(args...) }
func (s *settableLogger) Infof(format string, args ...interface{}) { s.get().Infof(format, args...) }
func (s *settableLogger) Infoln(args ...interface{}) { s.get().Infoln(args...) }
func (s *settableLogger) Warning(args ...interface{}) { s.get().Warning(args...) }
func (s *settableLogger) Warningf(format string, args ...interface{}) {
s.get().Warningf(format, args...)
}
func (s *settableLogger) Warningln(args ...interface{}) { s.get().Warningln(args...) }
func (s *settableLogger) Error(args ...interface{}) { s.get().Error(args...) }
func (s *settableLogger) Errorf(format string, args ...interface{}) {
s.get().Errorf(format, args...)
}
func (s *settableLogger) Errorln(args ...interface{}) { s.get().Errorln(args...) }
func (s *settableLogger) Fatal(args ...interface{}) { s.get().Fatal(args...) }
func (s *settableLogger) Fatalf(format string, args ...interface{}) { s.get().Fatalf(format, args...) }
func (s *settableLogger) Fatalln(args ...interface{}) { s.get().Fatalln(args...) }
func (s *settableLogger) Print(args ...interface{}) { s.get().Info(args...) }
func (s *settableLogger) Printf(format string, args ...interface{}) { s.get().Infof(format, args...) }
func (s *settableLogger) Println(args ...interface{}) { s.get().Infoln(args...) }
func (s *settableLogger) V(l int) bool { return s.get().V(l) }
func (s *settableLogger) Lvl(lvl int) grpclog.LoggerV2 {
s.mu.RLock()
l := s.l
s.mu.RUnlock()
if l.V(lvl) {
return s
}
return logutil.NewDiscardLogger()
}
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"context"
"fmt"
"io"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
type (
DefragmentResponse pb.DefragmentResponse
AlarmResponse pb.AlarmResponse
AlarmMember pb.AlarmMember
StatusResponse pb.StatusResponse
HashKVResponse pb.HashKVResponse
MoveLeaderResponse pb.MoveLeaderResponse
)
type Maintenance interface {
// AlarmList gets all active alarms.
AlarmList(ctx context.Context) (*AlarmResponse, error)
// AlarmDisarm disarms a given alarm.
AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error)
// Defragment releases wasted space from internal fragmentation on a given etcd member.
// Defragment is only needed when deleting a large number of keys and want to reclaim
// the resources.
// Defragment is an expensive operation. User should avoid defragmenting multiple members
// at the same time.
// To defragment multiple members in the cluster, user need to call defragment multiple
// times with different endpoints.
Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error)
// Status gets the status of the endpoint.
Status(ctx context.Context, endpoint string) (*StatusResponse, error)
// HashKV returns a hash of the KV state at the time of the RPC.
// If revision is zero, the hash is computed on all keys. If the revision
// is non-zero, the hash is computed on all keys at or below the given revision.
HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)
// Snapshot provides a reader for a point-in-time snapshot of etcd.
// If the context "ctx" is canceled or timed out, reading from returned
// "io.ReadCloser" would error out (e.g. context.Canceled, context.DeadlineExceeded).
Snapshot(ctx context.Context) (io.ReadCloser, error)
// MoveLeader requests current leader to transfer its leadership to the transferee.
// Request must be made to the leader.
MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error)
}
type maintenance struct {
dial func(endpoint string) (pb.MaintenanceClient, func(), error)
remote pb.MaintenanceClient
callOpts []grpc.CallOption
}
func NewMaintenance(c *Client) Maintenance {
api := &maintenance{
dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
conn, err := c.Dial(endpoint)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial endpoint %s with maintenance client: %v", endpoint, err)
}
cancel := func() { conn.Close() }
return RetryMaintenanceClient(c, conn), cancel, nil
},
remote: RetryMaintenanceClient(c, c.conn),
}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
api := &maintenance{
dial: func(string) (pb.MaintenanceClient, func(), error) {
return remote, func() {}, nil
},
remote: remote,
}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
req := &pb.AlarmRequest{
Action: pb.AlarmRequest_GET,
MemberID: 0, // all
Alarm: pb.AlarmType_NONE, // all
}
resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
if err == nil {
return (*AlarmResponse)(resp), nil
}
return nil, toErr(ctx, err)
}
func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
req := &pb.AlarmRequest{
Action: pb.AlarmRequest_DEACTIVATE,
MemberID: am.MemberID,
Alarm: am.Alarm,
}
if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
ar, err := m.AlarmList(ctx)
if err != nil {
return nil, toErr(ctx, err)
}
ret := AlarmResponse{}
for _, am := range ar.Alarms {
dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
if derr != nil {
return nil, toErr(ctx, derr)
}
ret.Alarms = append(ret.Alarms, dresp.Alarms...)
}
return &ret, nil
}
resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
if err == nil {
return (*AlarmResponse)(resp), nil
}
return nil, toErr(ctx, err)
}
func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
remote, cancel, err := m.dial(endpoint)
if err != nil {
return nil, toErr(ctx, err)
}
defer cancel()
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
return (*DefragmentResponse)(resp), nil
}
func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
remote, cancel, err := m.dial(endpoint)
if err != nil {
return nil, toErr(ctx, err)
}
defer cancel()
resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
return (*StatusResponse)(resp), nil
}
func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
remote, cancel, err := m.dial(endpoint)
if err != nil {
return nil, toErr(ctx, err)
}
defer cancel()
resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
return (*HashKVResponse)(resp), nil
}
func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
if err != nil {
return nil, toErr(ctx, err)
}
pr, pw := io.Pipe()
go func() {
for {
resp, err := ss.Recv()
if err != nil {
pw.CloseWithError(err)
return
}
if resp == nil && err == nil {
break
}
if _, werr := pw.Write(resp.Blob); werr != nil {
pw.CloseWithError(werr)
return
}
}
pw.Close()
}()
return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
}
type snapshotReadCloser struct {
ctx context.Context
io.ReadCloser
}
func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) {
n, err = rc.ReadCloser.Read(p)
return n, toErr(rc.ctx, err)
}
func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...)
return (*MoveLeaderResponse)(resp), toErr(ctx, err)
}
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"math"
"time"
"google.golang.org/grpc"
)
var (
// client-side handling retrying of request failures where data was not written to the wire or
// where server indicates it did not process the data. gRPC default is default is "FailFast(true)"
// but for etcd we default to "FailFast(false)" to minimize client request error responses due to
// transient failures.
defaultFailFast = grpc.FailFast(false)
// client-side request send limit, gRPC default is math.MaxInt32
// Make sure that "client-side send limit < server-side default send/recv limit"
// Same value as "embed.DefaultMaxRequestBytes" plus gRPC overhead bytes
defaultMaxCallSendMsgSize = grpc.MaxCallSendMsgSize(2 * 1024 * 1024)
// client-side response receive limit, gRPC default is 4MB
// Make sure that "client-side receive limit >= server-side default send/recv limit"
// because range response can easily exceed request send limits
// Default to math.MaxInt32; writes exceeding server-side send limit fails anyway
defaultMaxCallRecvMsgSize = grpc.MaxCallRecvMsgSize(math.MaxInt32)
// client-side non-streaming retry limit, only applied to requests where server responds with
// a error code clearly indicating it was unable to process the request such as codes.Unavailable.
// If set to 0, retry is disabled.
defaultUnaryMaxRetries uint = 100
// client-side streaming retry limit, only applied to requests where server responds with
// a error code clearly indicating it was unable to process the request such as codes.Unavailable.
// If set to 0, retry is disabled.
defaultStreamMaxRetries = ^uint(0) // max uint
// client-side retry backoff wait between requests.
defaultBackoffWaitBetween = 25 * time.Millisecond
// client-side retry backoff default jitter fraction.
defaultBackoffJitterFraction = 0.10
)
// defaultCallOpts defines a list of default "gRPC.CallOption".
// Some options are exposed to "clientv3.Config".
// Defaults will be overridden by the settings in "clientv3.Config".
var defaultCallOpts = []grpc.CallOption{defaultFailFast, defaultMaxCallSendMsgSize, defaultMaxCallRecvMsgSize}
// MaxLeaseTTL is the maximum lease TTL value
const MaxLeaseTTL = 9000000000
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
type SortTarget int
type SortOrder int
const (
SortNone SortOrder = iota
SortAscend
SortDescend
)
const (
SortByKey SortTarget = iota
SortByVersion
SortByCreateRevision
SortByModRevision
SortByValue
)
type SortOption struct {
Target SortTarget
Order SortOrder
}
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"context"
"sync"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
// Txn is the interface that wraps mini-transactions.
//
// Txn(context.TODO()).If(
// Compare(Value(k1), ">", v1),
// Compare(Version(k1), "=", 2)
// ).Then(
// OpPut(k2,v2), OpPut(k3,v3)
// ).Else(
// OpPut(k4,v4), OpPut(k5,v5)
// ).Commit()
//
type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
}
type txn struct {
kv *kv
ctx context.Context
mu sync.Mutex
cif bool
cthen bool
celse bool
isWrite bool
cmps []*pb.Compare
sus []*pb.RequestOp
fas []*pb.RequestOp
callOpts []grpc.CallOption
}
func (txn *txn) If(cs ...Cmp) Txn {
txn.mu.Lock()
defer txn.mu.Unlock()
if txn.cif {
panic("cannot call If twice!")
}
if txn.cthen {
panic("cannot call If after Then!")
}
if txn.celse {
panic("cannot call If after Else!")
}
txn.cif = true
for i := range cs {
txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
}
return txn
}
func (txn *txn) Then(ops ...Op) Txn {
txn.mu.Lock()
defer txn.mu.Unlock()
if txn.cthen {
panic("cannot call Then twice!")
}
if txn.celse {
panic("cannot call Then after Else!")
}
txn.cthen = true
for _, op := range ops {
txn.isWrite = txn.isWrite || op.isWrite()
txn.sus = append(txn.sus, op.toRequestOp())
}
return txn
}
func (txn *txn) Else(ops ...Op) Txn {
txn.mu.Lock()
defer txn.mu.Unlock()
if txn.celse {
panic("cannot call Else twice!")
}
txn.celse = true
for _, op := range ops {
txn.isWrite = txn.isWrite || op.isWrite()
txn.fas = append(txn.fas, op.toRequestOp())
}
return txn
}
func (txn *txn) Commit() (*TxnResponse, error) {
txn.mu.Lock()
defer txn.mu.Unlock()
r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
var resp *pb.TxnResponse
var err error
resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
if err != nil {
return nil, toErr(txn.ctx, err)
}
return (*TxnResponse)(resp), nil
}
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"math/rand"
"reflect"
"runtime"
"strings"
"time"
)
// jitterUp adds random jitter to the duration.
//
// This adds or subtracts time from the duration within a given jitter fraction.
// For example for 10s and jitter 0.1, it will return a time within [9s, 11s])
//
// Reference: https://godoc.org/github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils
func jitterUp(duration time.Duration, jitter float64) time.Duration {
multiplier := jitter * (rand.Float64()*2 - 1)
return time.Duration(float64(duration) * (1 + multiplier))
}
// Check if the provided function is being called in the op options.
func isOpFuncCalled(op string, opts []OpOption) bool {
for _, opt := range opts {
v := reflect.ValueOf(opt)
if v.Kind() == reflect.Func {
if opFunc := runtime.FuncForPC(v.Pointer()); opFunc != nil {
if strings.Contains(opFunc.Name(), op) {
return true
}
}
}
}
return false
}
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package rpctypes has types and values shared by the etcd server and client for v3 RPC interaction.
package rpctypes
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpctypes
var (
MetadataRequireLeaderKey = "hasleader"
MetadataHasLeader = "true"
)
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpctypes
var (
TokenFieldNameGRPC = "token"
TokenFieldNameSwagger = "authorization"
)
syntax = "proto2";
package etcdserverpb;
import "gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
message Request {
optional uint64 ID = 1 [(gogoproto.nullable) = false];
optional string Method = 2 [(gogoproto.nullable) = false];
optional string Path = 3 [(gogoproto.nullable) = false];
optional string Val = 4 [(gogoproto.nullable) = false];
optional bool Dir = 5 [(gogoproto.nullable) = false];
optional string PrevValue = 6 [(gogoproto.nullable) = false];
optional uint64 PrevIndex = 7 [(gogoproto.nullable) = false];
optional bool PrevExist = 8 [(gogoproto.nullable) = true];
optional int64 Expiration = 9 [(gogoproto.nullable) = false];
optional bool Wait = 10 [(gogoproto.nullable) = false];
optional uint64 Since = 11 [(gogoproto.nullable) = false];
optional bool Recursive = 12 [(gogoproto.nullable) = false];
optional bool Sorted = 13 [(gogoproto.nullable) = false];
optional bool Quorum = 14 [(gogoproto.nullable) = false];
optional int64 Time = 15 [(gogoproto.nullable) = false];
optional bool Stream = 16 [(gogoproto.nullable) = false];
optional bool Refresh = 17 [(gogoproto.nullable) = true];
}
message Metadata {
optional uint64 NodeID = 1 [(gogoproto.nullable) = false];
optional uint64 ClusterID = 2 [(gogoproto.nullable) = false];
}
syntax = "proto3";
package etcdserverpb;
import "gogoproto/gogo.proto";
import "etcdserver.proto";
import "rpc.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
message RequestHeader {
uint64 ID = 1;
// username is a username that is associated with an auth token of gRPC connection
string username = 2;
// auth_revision is a revision number of auth.authStore. It is not related to mvcc
uint64 auth_revision = 3;
}
// An InternalRaftRequest is the union of all requests which can be
// sent via raft.
message InternalRaftRequest {
RequestHeader header = 100;
uint64 ID = 1;
Request v2 = 2;
RangeRequest range = 3;
PutRequest put = 4;
DeleteRangeRequest delete_range = 5;
TxnRequest txn = 6;
CompactionRequest compaction = 7;
LeaseGrantRequest lease_grant = 8;
LeaseRevokeRequest lease_revoke = 9;
AlarmRequest alarm = 10;
AuthEnableRequest auth_enable = 1000;
AuthDisableRequest auth_disable = 1011;
InternalAuthenticateRequest authenticate = 1012;
AuthUserAddRequest auth_user_add = 1100;
AuthUserDeleteRequest auth_user_delete = 1101;
AuthUserGetRequest auth_user_get = 1102;
AuthUserChangePasswordRequest auth_user_change_password = 1103;
AuthUserGrantRoleRequest auth_user_grant_role = 1104;
AuthUserRevokeRoleRequest auth_user_revoke_role = 1105;
AuthUserListRequest auth_user_list = 1106;
AuthRoleListRequest auth_role_list = 1107;
AuthRoleAddRequest auth_role_add = 1200;
AuthRoleDeleteRequest auth_role_delete = 1201;
AuthRoleGetRequest auth_role_get = 1202;
AuthRoleGrantPermissionRequest auth_role_grant_permission = 1203;
AuthRoleRevokePermissionRequest auth_role_revoke_permission = 1204;
}
message EmptyResponse {
}
// What is the difference between AuthenticateRequest (defined in rpc.proto) and InternalAuthenticateRequest?
// InternalAuthenticateRequest has a member that is filled by etcdserver and shouldn't be user-facing.
// For avoiding misusage the field, we have an internal version of AuthenticateRequest.
message InternalAuthenticateRequest {
string name = 1;
string password = 2;
// simple_token is generated in API layer (etcdserver/v3_server.go)
string simple_token = 3;
}
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserverpb
import (
"fmt"
"strings"
proto "github.com/golang/protobuf/proto"
)
// InternalRaftStringer implements custom proto Stringer:
// redact password, replace value fields with value_size fields.
type InternalRaftStringer struct {
Request *InternalRaftRequest
}
func (as *InternalRaftStringer) String() string {
switch {
case as.Request.LeaseGrant != nil:
return fmt.Sprintf("header:<%s> lease_grant:<ttl:%d-second id:%016x>",
as.Request.Header.String(),
as.Request.LeaseGrant.TTL,
as.Request.LeaseGrant.ID,
)
case as.Request.LeaseRevoke != nil:
return fmt.Sprintf("header:<%s> lease_revoke:<id:%016x>",
as.Request.Header.String(),
as.Request.LeaseRevoke.ID,
)
case as.Request.Authenticate != nil:
return fmt.Sprintf("header:<%s> authenticate:<name:%s simple_token:%s>",
as.Request.Header.String(),
as.Request.Authenticate.Name,
as.Request.Authenticate.SimpleToken,
)
case as.Request.AuthUserAdd != nil:
return fmt.Sprintf("header:<%s> auth_user_add:<name:%s>",
as.Request.Header.String(),
as.Request.AuthUserAdd.Name,
)
case as.Request.AuthUserChangePassword != nil:
return fmt.Sprintf("header:<%s> auth_user_change_password:<name:%s>",
as.Request.Header.String(),
as.Request.AuthUserChangePassword.Name,
)
case as.Request.Put != nil:
return fmt.Sprintf("header:<%s> put:<%s>",
as.Request.Header.String(),
NewLoggablePutRequest(as.Request.Put).String(),
)
case as.Request.Txn != nil:
return fmt.Sprintf("header:<%s> txn:<%s>",
as.Request.Header.String(),
NewLoggableTxnRequest(as.Request.Txn).String(),
)
default:
// nothing to redact
}
return as.Request.String()
}
// txnRequestStringer implements a custom proto String to replace value bytes fields with value size
// fields in any nested txn and put operations.
type txnRequestStringer struct {
Request *TxnRequest
}
func NewLoggableTxnRequest(request *TxnRequest) *txnRequestStringer {
return &txnRequestStringer{request}
}
func (as *txnRequestStringer) String() string {
var compare []string
for _, c := range as.Request.Compare {
switch cv := c.TargetUnion.(type) {
case *Compare_Value:
compare = append(compare, newLoggableValueCompare(c, cv).String())
default:
// nothing to redact
compare = append(compare, c.String())
}
}
var success []string
for _, s := range as.Request.Success {
success = append(success, newLoggableRequestOp(s).String())
}
var failure []string
for _, f := range as.Request.Failure {
failure = append(failure, newLoggableRequestOp(f).String())
}
return fmt.Sprintf("compare:<%s> success:<%s> failure:<%s>",
strings.Join(compare, " "),
strings.Join(success, " "),
strings.Join(failure, " "),
)
}
// requestOpStringer implements a custom proto String to replace value bytes fields with value
// size fields in any nested txn and put operations.
type requestOpStringer struct {
Op *RequestOp
}
func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
return &requestOpStringer{op}
}
func (as *requestOpStringer) String() string {
switch op := as.Op.Request.(type) {
case *RequestOp_RequestPut:
return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String())
case *RequestOp_RequestTxn:
return fmt.Sprintf("request_txn:<%s>", NewLoggableTxnRequest(op.RequestTxn).String())
default:
// nothing to redact
}
return as.Op.String()
}
// loggableValueCompare implements a custom proto String for Compare.Value union member types to
// replace the value bytes field with a value size field.
// To preserve proto encoding of the key and range_end bytes, a faked out proto type is used here.
type loggableValueCompare struct {
Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult"`
Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget"`
Key []byte `protobuf:"bytes,3,opt,name=key,proto3"`
ValueSize int `protobuf:"bytes,7,opt,name=value_size,proto3"`
RangeEnd []byte `protobuf:"bytes,64,opt,name=range_end,proto3"`
}
func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompare {
return &loggableValueCompare{
c.Result,
c.Target,
c.Key,
len(cv.Value),
c.RangeEnd,
}
}
func (m *loggableValueCompare) Reset() { *m = loggableValueCompare{} }
func (m *loggableValueCompare) String() string { return proto.CompactTextString(m) }
func (*loggableValueCompare) ProtoMessage() {}
// loggablePutRequest implements a custom proto String to replace value bytes field with a value
// size field.
// To preserve proto encoding of the key bytes, a faked out proto type is used here.
type loggablePutRequest struct {
Key []byte `protobuf:"bytes,1,opt,name=key,proto3"`
ValueSize int `protobuf:"varint,2,opt,name=value_size,proto3"`
Lease int64 `protobuf:"varint,3,opt,name=lease,proto3"`
PrevKv bool `protobuf:"varint,4,opt,name=prev_kv,proto3"`
IgnoreValue bool `protobuf:"varint,5,opt,name=ignore_value,proto3"`
IgnoreLease bool `protobuf:"varint,6,opt,name=ignore_lease,proto3"`
}
func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
return &loggablePutRequest{
request.Key,
len(request.Value),
request.Lease,
request.PrevKv,
request.IgnoreValue,
request.IgnoreLease,
}
}
func (m *loggablePutRequest) Reset() { *m = loggablePutRequest{} }
func (m *loggablePutRequest) String() string { return proto.CompactTextString(m) }
func (*loggablePutRequest) ProtoMessage() {}
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment