toxiproxy介绍及源码分析
May 19, 2024
toxiproxy是Shopify开发的混动工程代理,可注入各种混动工程所需的异常,包括延迟/抖动、超时、下线、流量限制等,用于可用性测试等场景。类似nginx等反向代理,toxiproxy可以在多个指定端口上启动tcp监听服务(proxy),并建立到后端upstream的连接管道,以非代码侵入的方式来模拟混沌异常。特定proxy的异常策略(比如延时、超时、流量控制等)也可以灵活调整。
本文介绍其用法以及工作原理。
用法
# 启动server
» ./toxiproxy-server
# 新建proxy
» ./toxiproxy-cli create -l localhost:26379 -u localhost:6379 redis 130 ↵
Created new proxy redis
# 注入延迟异常(toxic)
» ./toxiproxy-cli toxic add -t latency -a latency=1000 redis
Added downstream latency toxic 'latency_downstream' on proxy 'redis'
# 客户端测试:从耗时上看已经生效了
» redis-cli -p 26379
127.0.0.1:26379> ping
PONG
(1.00s)
可见客户端唯一要做的事情就是将原始的upstream地址切换到toxiproxy中对应的proxy地址,没有其他代码层面的变化。可以通过服务发现等机制调度实现0代码入侵。
数据结构
下面通过 toxiproxy-server 代码分析其工作原理。
toxiproxy-server
toxiproxy服务端实例,所有流量的都会经过toxiproxy-server并经过一系列的toxic策略处理后转发给upstream,并将回包返回给client。其内部包括以下几个部分:
type ApiServer struct {
Collection *ProxyCollection // proxy collection
Metrics *metricsContainer // 指标收集
Logger *zerolog.Logger
http *http.Server // http API
}
- Collection:服务端所有proxy的集合,注意同一个proxy是可以被多个client共用的。
- metrics:用于Prometheus数据上报
- logger:日志
- http:提供管理接口。
http server
toxiproxy-server 提供了一个HTTP的API服务,提供服务相关的管控接口(不包括流量代理),包括管理(增删改查)proxy以及toxic。中间件包括:
1. 日志记录
2. 请求ID生成, "request_id"用于记录请求ID, "X-Toxiproxy-Request-Id"用于返回给客户端(response header)
3. 请求日志记录,记录client、method、url、user-agent、status、size、duration、handler等信息
4. 浏览器检测,如果UA是Mozilla/x.x.x,则返回403 —— 就是禁止浏览器访问的意思,这个方法有点暴力
5. 超时检测,如果请求超过25秒,则返回408 —— 请求超时
主要功能都是对proxy、toxic的 基本操作。
// 启动所有proxy。并且移除所有toxics
r.HandleFunc("/reset", server.ResetState).Methods("POST").Name("ResetState")
// list 所有proxy
r.HandleFunc("/proxies", server.ProxyIndex).Methods("GET").Name("ProxyIndex")
// 创建proxy
r.HandleFunc("/proxies", server.ProxyCreate).Methods("POST").Name("ProxyCreate")
// 批量添加或者更新(比如名称、状态)proxy
r.HandleFunc("/populate", server.Populate).Methods("POST").Name("Populate")
// 查看特定proxy
r.HandleFunc("/proxies/{proxy}", server.ProxyShow).Methods("GET").Name("ProxyShow")
// 更新proxy
r.HandleFunc("/proxies/{proxy}", server.ProxyUpdate).Methods("POST", "PATCH").Name("ProxyUpdate")
// 删除proxy
r.HandleFunc("/proxies/{proxy}", server.ProxyDelete).Methods("DELETE").Name("ProxyDelete")
// List active toxics
r.HandleFunc("/proxies/{proxy}/toxics", server.ToxicIndex).Methods("GET").Name("ToxicIndex")
// 创建toxic
r.HandleFunc("/proxies/{proxy}/toxics", server.ToxicCreate).Methods("POST").Name("ToxicCreate")
// 查看、更新、删除proxy下的toxic
r.HandleFunc("/proxies/{proxy}/toxics/{toxic}", server.ToxicShow).Methods("GET").Name("ToxicShow")
r.HandleFunc("/proxies/{proxy}/toxics/{toxic}", server.ToxicUpdate).Methods("POST", "PATCH").Name("ToxicUpdate")
r.HandleFunc("/proxies/{proxy}/toxics/{toxic}", server.ToxicDelete).Methods("DELETE").Name("ToxicDelete")
proxy
作用:
proxy时真正用于流量代理的模块,一个toxiproxy-server实例下可以存在多个proxy,为不同的upstream代理不同的流量。proxy在创建完毕之后就会一直在指定的TCP端口上监听客户端的连接请求,同时也需要响应来自信号的关闭请求,关闭时会清理已经开启的客户端链接并退出。
当client连接到toxicproxy中的某个proxy之后,proxy会创建两条TCP链接,分别为从client到 toxiproxy-server 以及从toxiproxy-server 到 upstream 的连接,并保存在connections中,其作用类似管道,用于代理转发client以及upstream之间的通信。同一proxy是可以同时响应多个客户端的流量代理的,所以会有一层 proxy collection 的存在,其目的就是维护proxy的集合(map)。其中key为proxy的名称(create时指定),value为proxy对象。
type ProxyCollection struct {
sync.RWMutex
proxies map[string]*Proxy
}
创建方式:
proxy可以通过API接口或者配置文件创建,比如下面的配置文件要求toxiproxy实例创建两个proxy,分别用作redis以及mysql的流量代理:
[
{
"name": "redis",
"listen": "[::]:16379",
"upstream": "127.0.0.1:6379",
"enabled": true
},
{
"name": "mysql",
"listen": "[::]:14355",
"upstream": "127.0.0.1:4355",
"enabled": true
}
]
数据结构:
type Proxy struct {
sync.Mutex
Name string `json:"name"`
Listen string `json:"listen"`
Upstream string `json:"upstream"`
Enabled bool `json:"enabled"`
listener net.Listener // 本地tcp监听listener
started chan error
tomb tomb.Tomb // 用于优雅退出时关闭proxy下的connection list
connections ConnectionList // 客户端连接+upstream连接列表。每一个客户端链接存在一个对应的upstream链接
Toxics *ToxicCollection `json:"-"` // 指向ToxicCollection对象
apiServer *ApiServer /// 指向server对象
Logger *zerolog.Logger
}
// 客户端&upstream连接
type ConnectionList struct {
list map[string]net.Conn
lock sync.Mutex
}
- listener:每个proxy都有一个单独的goroutine负责监听来自客户端的连接。连接建立后会创建一条与之对应的upstream连接。看着资源消耗较大,但toxiproxy毕竟是在tpc层上的proxy,所以没法做到像nginx类似的upstream共享的。
- tomb:用于proxy优雅退出。proxy启动后goroutine block在Accept操作上,没有办法优雅的通知其执行清理退出操作。所以引入了tomb库。参考这里。
toxic
proxy上如果没有挂载任何toxic,此时他的作用就是一条纯tcp的流量代理。为了实现混沌工程的效果,需要添加不同的toxic策略,比如延迟一段时间,或者流量控制等。
挂载在proxy之上的异常策略,区分多种类型,比如上面用法中的 latence 类型,目的就是让经过proxy的流量延时1秒后再返回给client。从而模拟redis繁忙的场景。各toxic采用factory model设计模式,会被注册到全局的ToxicRegistory对象管理,所以开发者也可以方便的实现自定义的toxic策略。
var (
// 存储所有注册的toxic指针
ToxicRegistry map[string]Toxic
registryMutex sync.RWMutex
)
// in toxics/latency.go
func init() {
Register("latency", new(LatencyToxic))
}
各个toxic采用链表的方式,挂载到proxy之上,作用于proxy下的各个connection。所以实现一个proxy仅需要关注其输入、输出即可,proxy会负责整个toxic链路的串联调度。注意toxic自身虽然封装了处理逻辑,但是它还是一个静态的对象,自身不负责调度执行,只有在链接创建完毕之后才会有创建对应的goroutine执行调度,同一toxic的执行在各链接之间相互隔离。下面会详细介绍。
下面看两个简单的toxic实现就大致知道其原理了。
noop toxic
NoopToxic 为toxic链表的首节点,他的作用非常简单,就是从Input读取数据实时的原样转给output。
- client → upstream:input就是来自客户端的链接,output就是从toxicproxy到upstream的链接。
- upstream → client: 与上面相反。
// The NoopToxic passes all data through without any toxic effects.
type NoopToxic struct{}
func (t *NoopToxic) Pipe(stub *ToxicStub) {
for {
select {
case <-stub.Interrupt:
return
case c := <-stub.Input:
if c == nil {
stub.Close()
return
}
stub.Output <- c
}
}
}
注意noop toxic是在proxy中一定会存在的,即使用户没有指定任何toxic策略。
latency toxic
LatencyToxic 的作用是在proxy中将流量hold一段时间,然后再转发给upstream或者client。从代码上看其实就是从Input接收到流量后select block一段时长(delay),然后再写入output。
func (t *LatencyToxic) Pipe(stub *ToxicStub) {
for {
select {
case <-stub.Interrupt:
return
case c := <-stub.Input:
if c == nil {
stub.Close()
return
}
sleep := t.delay() - time.Since(c.Timestamp)
select {
case <-time.After(sleep):
c.Timestamp = c.Timestamp.Add(sleep)
stub.Output <- c
case <-stub.Interrupt:
// Exit fast without applying latency.
stub.Output <- c // Don't drop any data on the floor
return
}
}
}
}
调度
从上面的数据结构可以看出,proxy仅仅是负责了client、upstream connection的管理,而toxic页仅仅是负责了其自身的数据流处理,此时还缺少一个发动机,将整体流程调度起来,整合connection(链接)与toxic(策略)的功能
ToxicCollection
ToxicCollection 是串联起整个流程的核心数据结构,每个proxy下都有一个ToxicCollection来管理已经激活其下的toxics。
type ToxicCollection struct {
noop *toxics.ToxicWrapper // 初始化创建noop
proxy *Proxy // 指向归属proxy
chain [][]*toxics.ToxicWrapper // upstream + downstream toxics的数组。第0项为noop,每增加一个toxic,都会在数组中append一个新的toxic
links map[string]*ToxicLink // 每条连接的建立会有两条link。分别作用downstream、upstream的数据流处理。
}
chain
chain主要记录proxy下toxic的配置,包括name、direction、buffersize等静态配置。toxic基本都是逻辑的执行,很少有数据存储的需求,所以在一个proxy的的各个toxic是所有链接共享的,存放在chain数组中。
增加toxic时需要指定direction,用于指定策略时生效于 client → upstream(upstream),还是upstream → client(downstream),所以chain也有两条,分别记录downstream和upstream的toxics数组。
links
ToxicLinks 是单向管道,通过一系列的 toxics将输入和输出连接起来。 该链条始终以 NoopToxic开始,并且随着 toxics 的启用或禁用而添加或移除。新的 toxics 总是添加到链条的末尾。
客户端连接建立完毕之后会在ToxicCollection建立两条link,分别用作upstream、downstream的数据流处理。名称分别为客户端地址+”upstream”/”downstream”。
type ToxicLink struct {
stubs []*toxics.ToxicStub
proxy *Proxy
toxics *ToxicCollection
input *stream.ChanWriter
output *stream.ChanReader
direction stream.Direction
Logger *zerolog.Logger
}
- stubs:封装toxic所需的input、output channel,以及toxic可能需要的连接相关的数据结构。比如流量统计需要记录连接下的累计总流量,这种跟随connection比较合适。
- input:指向ChanWriter对象,负责从source读取数据并丢给toxic链表首节点(即noop节点)。
- output:从链表中读取数据,直到满足BufferSize条件为止。
StartLink
在client连接建立成功之后(含upstream),会调用ToxicCollection.StartLink实现链接相关的toxic链条建立以及执行。
// proxy监听协程,处理客户端的proxy连接【去掉部分非核心代码】
func (proxy *Proxy) server() {
err := proxy.listen()
for {
// 会阻塞,等待客户端连接。后者listener被关闭
client, err := proxy.listener.Accept()
// 每accept新的链接,创建一个到upstream的对应的链接
upstream, err := net.Dial("tcp", proxy.Upstream)
name := client.RemoteAddr().String()
proxy.connections.list[name+"upstream"] = upstream
proxy.connections.list[name+"downstream"] = client
// 启动UP和DOWN链接的toxics link创建以及调度
proxy.Toxics.StartLink(proxy.apiServer, name+"upstream", client, upstream, stream.Upstream)
proxy.Toxics.StartLink(proxy.apiServer, name+"downstream", upstream, client, stream.Downstream)
}
可见每个客户端连接都有两条toxic链条,分别用作upstream、downstream的数据流处理。且其input和output正好相反。
数据流的处理包括一系列的goroutine,其主要逻辑就是从input读取 → toxic自身逻辑 → 写入output,即下一个toxic的input。包括:
link.read
逻辑很直接,一直从source(client → toxicproxy的链接)中读取数据,写入link.input对象。直到source链接关闭为止。
func (link *ToxicLink) read(
metricLabels []string,
server *ApiServer,
source io.Reader,
) {
// 会一直阻塞,直到source结束。过程中没有metrics数据?
bytes, err := io.Copy(link.input, source)
if server.Metrics.proxyMetricsEnabled() {
server.Metrics.ProxyMetrics.ReceivedBytesTotal.
WithLabelValues(metricLabels...).Add(float64(bytes))
}
fmt.Println("read from source end", bytes)
link.input.Close()
}
而link.inpu对象会被toxic链表的首个toxic处理,即noop toxic。noop toxic的逻辑不再赘述。
各toxic的执行
链接下的每个toxic,都会有一个独立的goroutine负责调度。其逻辑就是根据toxicity的比例,挑选部分流量执行toxic策略,未命中的流量直接使用noop toxic转给下一个toxic处理。
for i, toxic := range link.toxics.chain[link.direction] {
// 链接+toxic 相关的状态数据(通常toxic时proxy下的所有链接共享的,或者不需要私有数据)
if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
link.stubs[i].State = stateful.NewState()
}
go link.stubs[i].Run(toxic)
}
// 调度toxic。每个toxic都有一个goroutine。根据Toxicity大小,toxic会随机执行。
func (s *ToxicStub) Run(toxic *ToxicWrapper) {
s.running = make(chan struct{})
defer close(s.running)
randomToxicity := rand.Float32() // #nosec G404 -- was ignored before too
if randomToxicity < toxic.Toxicity {
toxic.Pipe(s)
} else {
new(NoopToxic).Pipe(s)
}
}
link.write
流量处理的收尾,将最后一个chan的数据写入connection。