pREST源码分析

August 22, 2024

之前的文章中已经分析了pREST的功能和基础用法 pREST功能简介 ,本文介绍其代码结构以及实现原理。重点关注其CRUD能力以及自定义查询的实现方案。

整体结构

项目采用golang实现,技术栈包括:

  • cobra:golang的CLI框架。因为会支持多个子命令,包括主服务监听、db migration等。
  • negroni:中间件管理,包括日志、鉴权等公共中间件,会影响所有路由。
  • mux:路由管理。
  • bolddb:缓存模块,用于缓存GET请求时的db查询结果,加速查询。

项目代码结构如下:

[text]
├── adapters // 封装postgres逻辑 
├── cache    // 支持缓存,使用 buntdb 缓存API处理结果,加速查询
├── cmd      // 子命令
├── config   // 配置解析
├── context  // context管理
├── controllers   // 路由处理,包括元数据、表数据CRUD等入口
├── middlewares   // 中间件
├── plugins    // plugin处理逻辑
├── router  // 路由注册
├── scripts   //  docker脚本等
├── template   // 自定义查询中支持的函数实现
├── testdata  // 测试数据

本文仅关注pREST server部分的实现。

启动流程

  1. 配置文件加载及解析

    项目使用了viper来管理配置,支持配置文件或者环境变量的方式来设置配置。详见Configuring pREST,代码详见 config.go 。配置解析完毕后保存在全局变量中供后续访问。

  2. 公共中间件配置:使用negroni来管理中间件,部分中间件可以根据配置开关决定是否启用,公共中间件包括:
    1. negroni自带基础中间件,包括:NewRecovery、NewLogger
    2. HandlerSet中间件:用来根据相应格式参数 _renderer 将应答转换为json或者xml格式。
    3. SetTimeoutToContext:设置超时时间到context。后续处理时使用。
    4. CORS中间件:用于处理跨域相关配置。
    5. JWT验证中间件:校验 JWT 令牌,会放过 PREST_JWT_WHITELIST 中的请求。验证失败时对请求进行拦截。
    6. 缓存读取中间件:启用时会优先从buntdb读取缓存结果。命中则直接返回。
    7. 禁用Expose中间件:可以禁止部分list database、schema、tables的能力。
  3. 路由注册:注册API,按功能可以分为以下几类:
    • 用户认证:提供登录以及获取auth token能力
    • 元数据列表:包括db、schema、tables列表拉取,以及表结构拉取
    • 自定义查询
    • CRUD相关接口
    • 插件化能力:用于可以编写so挂载到pREST服务下,不推荐。
[go]
// 用StrictSlash选项。这意呀着如果路由路径以斜杠/结尾,而请求没有以斜杠结尾,或者相反,路由器将尝试重定向请求到正确的路径(如果可能)。
	router := mux.NewRouter().StrictSlash(true)

// 用户认证
if config.PrestConf.AuthEnabled {
	// 从数据库中获取用户名和密码,成功则生成JWT token
	// 注意:DB的操作方式,虽然使用adater封装了一层,但依然需要上传拼装sql。解耦不彻底。
	router.HandleFunc("/auth", controllers.Auth).Methods("POST")
}

// 数据库列表: 从pg_database中获取数据库列表
router.HandleFunc("/databases", controllers.GetDatabases).Methods("GET")
// schema列表:从information_schema.schemata中查询 schema_name
router.HandleFunc("/schemas", controllers.GetSchemas).Methods("GET")
// table列表:从 pg_catalog.pg_class、pg_catalog.pg_namespace 获取表名
router.HandleFunc("/tables", controllers.GetTables).Methods("GET")

// 自定义查询:从文件或脚本中获取查询语句,并根据参数解析后执行。返回执行结果
router.HandleFunc("/_QUERIES/{queriesLocation}/{script}", controllers.ExecuteFromScripts)

// 插件执行
if runtime.GOOS != "windows" {
	router.HandleFunc("/_PLUGIN/{file}/{func}", plugins.HandlerPlugin)
}
// database/schema下的table列表(可以与上面的table列表合并?)
router.HandleFunc("/{database}/{schema}", controllers.GetTablesByDatabaseAndSchema).Methods("GET")
// 数据库/schema下的table结构
router.HandleFunc("/show/{database}/{schema}/{table}", controllers.ShowTable).Methods("GET")

// 在 / 路径下,注册 CRUD 子路由??
crudRoutes := mux.NewRouter().PathPrefix("/").Subrouter().StrictSlash(true)

// 健康检查
router.HandleFunc("/_health", controllers.WrappedHealthCheck(controllers.DefaultCheckList)).Methods("GET")

// table CRUD操作
crudRoutes.HandleFunc("/{database}/{schema}/{table}", controllers.SelectFromTables).Methods("GET")
crudRoutes.HandleFunc("/{database}/{schema}/{table}", controllers.InsertInTables).Methods("POST")
crudRoutes.HandleFunc("/batch/{database}/{schema}/{table}", controllers.BatchInsertInTables).Methods("POST")
crudRoutes.HandleFunc("/{database}/{schema}/{table}", controllers.DeleteFromTable).Methods("DELETE")
crudRoutes.HandleFunc("/{database}/{schema}/{table}", controllers.UpdateTable).Methods("PUT", "PATCH")

// 因为下面几个中间件仅针对 CRUD 操作生效
router.PathPrefix("/").Handler(negroni.New(
	middlewares.ExposureMiddleware(),
	middlewares.AccessControl(),
	middlewares.AuthMiddleware(),
	middlewares.CacheMiddleware(&config.PrestConf.Cache),
	// plugins middleware
	plugins.MiddlewarePlugin(),
	negroni.Wrap(crudRoutes), // 仅针对 CRUD 操作生效
))

Auth

pREST提供了一个auth接口,用于根据用户名和密码获取Auth Token。后续接口访问时需要携带Auth Token进行身份认证。

pREST仅支持用户名、密码的认证,不支持第三方的OAuth进行认证,所以整体实现比较简单。主要流程包括:

  1. 从参数中获取用户名、密码。支持两种携带方式,之前的文章中已经有介绍。
  2. 从DB的用户表中取用户名、密码进行比对。匹配失败则返回。
  3. 调用go-jose生成JWT token。用于后续请求的身份认证。
[go]
func Token(u auth.User) (t string, err error) {
	// add start time (NotBefore)
	getToken := time.Now()
	// add expiry time in configuration (in minute format, so we support the maximum need)
	expireToken := time.Now().Add(time.Hour * 6)

	// TODO: JWT any Algorithm support
	sig, err := jose.NewSigner(
		jose.SigningKey{
			Algorithm: jose.HS256,
			Key:       []byte(config.PrestConf.JWTKey)},
		(&jose.SignerOptions{}).WithType("JWT"))
	if err != nil {
		return
	}

	cl := auth.Claims{
		UserInfo:  u,
		NotBefore: jwt.NewNumericDate(getToken),
		Expiry:    jwt.NewNumericDate(expireToken),
	}
	return jwt.Signed(sig).Claims(cl).CompactSerialize()
}

JWT的验证逻辑位于上面的中间件AuthMiddleware中。

CRUD操作

CRUD的大致原理就是根据请求filter中的各项参数,拼装成数据库的SQL语句,然后返回执行结果给到client端。

以表查询操作的执行逻辑为例分析其实现:

SQL拼接

  • 对应的路由:
[go]
crudRoutes.HandleFunc("/{database}/{schema}/{table}", controllers.SelectFromTables).Methods("GET")
  • FieldsPermissions:请求字段解析和字段权限过滤

    请求字段解析:API中通过 _select 参数携带各请求字段,代码中需要对其解析并格式化为SQL中所需的 "field1","field2" 格式。另外如果有分组参数 _groupby ,需要对字段中的分组条件进行转换,API参数 sum:{field name} 需要转换为SQL中的 sum("field name")

    字段权限过滤:配置文件可以控制每个表、字段是否开放对外的 select 、insert、delete 权限。查询时会对未开放权限的表、字段进行过滤or拦截。详见 restrict-mode

  • 利用SQL:利用上面解析后的字段列表和URL Path中解析出的表相关信息( /{database}/{schema}/{table),拼接出SQL语句中的 SELECT fields FROM db.schema.table 部分。
  • DistinctClause:如果参数中有指定需要去重,即携带参数 _distinct=true ,需要在field前增加distinct约束 SELECT distinct fields FROM db.schema.table
  • CountByRequest:如果有指定count参数 _count={field name} ,需要将对应的字段添加到select语句中。

    count、distint为何跟其他聚合函数(如SUM等)的携带方式不同?猜测是历史原因,导致参数处理方式存在差异。

  • JoinByRequest:如果有join条件,需要构造对应的join语句。
    [go]
    // 构造join语句,格式如: `left join on tab1.field1 = tab2.field2`
    joinQuery := fmt.Sprintf(` %s JOIN "%s" ON "%s"."%s" %s "%s"."%s" `, strings.ToUpper(joinArgs[0]), joinArgs[1], spl[0], spl[1], op, splj[0], splj[1])
  • WhereByRequest:根据filter条件中指定的字段过滤条件( {FIELD NAME}={VALUE} )构造where子句。需要处理多种查询操作符,支持对JSONB字段和全文搜索(tsquery)的特定处理,以及标准的SQL比较操作符。
    [go]
    // 根据操作符不同,拼接占位符和值
    switch op {
    	case "IN", "NOT IN":
    		v := strings.Split(value, ",")
    		keyParams := make([]string, len(v))
    		for i := 0; i < len(v); i++ {
    			whereValues = append(whereValues, v[i])
    			keyParams[i] = fmt.Sprintf(`$%d`, pid+i)
    		}
    		pid += len(v)
    		whereKey = append(whereKey, fmt.Sprintf(`%s %s (%s)`, key, op, strings.Join(keyParams, ",")))
    	case "ANY", "SOME", "ALL":
    		whereKey = append(whereKey, fmt.Sprintf(`%s = %s ($%d)`, key, op, pid))
    		whereValues = append(whereValues, formatters.FormatArray(strings.Split(value, ",")))
    		pid++
    	case "IS NULL", "IS NOT NULL", "IS TRUE", "IS NOT TRUE", "IS FALSE", "IS NOT FALSE":
    		whereKey = append(whereKey, fmt.Sprintf(`%s %s`, key, op))
    	default: // "=", "!=", ">", ">=", "<", "<="
    		whereKey = append(whereKey, fmt.Sprintf(`%s %s $%d`, key, op, pid))
    		whereValues = append(whereValues, value)
    		pid++
    }
  • GroupByClause:如果参数中有指定group by参数 _groupby,SQL中需要追加对应的group by条件,以及对应的having过滤条件。
  • OrderByRequest:如果参数中有指定order by参数 _order ,SQL中需要追加order by条件。
  • PaginateIfPossible:根据分页参数 _page_page_size追加分页条件,对应SQL中的 LIMIT OFFSET部分。

以上就是完整的SQL拼接过程,本质就是将各参数转换为对应的SQL。包括:

参数对应SQL
_page={set page number}翻页参数,对应 LIMIT
_page_size={number to return by pages}翻页参数,对应 OFFSET
?_select={field name 1},{fiel name 2}字段参数,对应SELECT中的字段选择
?_count={field name}字段参数,会转换为count聚合函数
?_count_first=truecount的特殊形态,要求直接返回对象,而非数组。其他没有差别。
?_renderer=xml控制最终返回格式,默认json,否则为xml
?_distinct=true字段参数,会转换为distinct聚合函数
?_order={FIELD}排序,对应 ORDER By
?_groupby={FIELD}分组,对应 GROUP BY
?{FIELD NAME}={VALUE}过滤条件,对应 WHERE中的各种条件,包括操作符。

执行查询

有了拼接好的sql,执行就比较简单了。pREST中DB操作是通过 sqlx 完成,此lib在标准库 database/sql 做了一些封装,便于使用。

此外为了支持多个数据库,pREST内还做了一层connection的封装,以维护各DB的独立连接。

执行分为两步,prepare & execute。可以直接看代码:

[go]
func (adapter *Postgres) QueryCtx(ctx context.Context, SQL string, params ...interface{}) (sc adapters.Scanner) {
	// use the db_name that was set on request to avoid runtime collisions
	db, err := getDBFromCtx(ctx)
	if err != nil {
		log.Errorln(err)
		return &scanner.PrestScanner{Error: err}
	}
	SQL = fmt.Sprintf("SELECT %s(s) FROM (%s) s", config.PrestConf.JSONAggType, SQL)
	log.Debugln("generated SQL:", SQL, " parameters: ", params)
	p, err := Prepare(db, SQL)
	if err != nil {
		log.Errorln(err)
		return &scanner.PrestScanner{Error: err}
	}
	var jsonData []byte
	err = p.QueryRowContext(ctx, params...).Scan(&jsonData)
	if len(jsonData) == 0 {
		jsonData = []byte("[]")
	}
	return &scanner.PrestScanner{
		Error:   err,
		Buff:    bytes.NewBuffer(jsonData),
		IsQuery: true,
	}
}

可以看到在执行之前,在拼接的SQL以上还封装了一次SQL,其目的是利用pg数据库中的 json_aggjsonb_agg 来实现输出格式的转换,而非代码中自己进行格式化输出。

最后,会根据配置参数决定是否将缓存结果写入 buntDB,以供后续查询读取。

自定义查询

从上面的SQL拼接过程也可以看出,通过参数来拼接复杂的查询是比较难以实现的,比如超过2张表以上的关联。

[text]
/{DATABASE}/{SCHEMA}/{TABLE}?_join={TYPE}:{TABLE JOIN}:{TABLE.FIELD}:{OPERATOR}:{TABLE JOIN.FIELD}

上面的 _join 参数是两张表的关联条件,里面其实拆分成了5个部分,对应SQL中的 left join on tab1.field1 = tab2.field2 各个部分了。如果再扩展到3张表,那参数基本很难理解了。

所以pREST基于golang的文本模板能力,支持了自定义查询。即用户编写SQL模板(里面包含了参数占位符),然后通过制定版本以及携带请求参数的方式,拼接成执完整SQL后执行的方式来解决复杂查询的问题。注意模板的后缀需要与约定的规范一致。

HTTP VerbSuffix
GET.read.sql
POST.write.sql
PUT, PATCH.update.sql
DELETE.delete.sql

本质的原因是会影响pREST获取执行结果的方式,比如对应insert场景,需要获取插入记录数。而pREST是不理解SQL的具体内容的。

模板解析

目的:读取模板文件,然后解析并将参数占位符替换为http请求中的各项实际参数值。

流程分析:

  • 查找脚本文件路径。会根据 脚本路径+脚本名称+HTTP Method 查找脚本文件。
  • 参数提取:从header、query中提取参数,自定义查询支持两种携带参数方式。
  • ParseScript:解析脚本,模板引擎这里使用了golang标注库的 text/template ,所以改库中的各种function理论上这里都是可以使用的。详见 https://pkg.go.dev/text/template#hdr-Functions

    此外pREST自己也扩展了一些函数,包括:

    [go]
    func (fr *FuncRegistry) RegistryAllFuncs() (funcs template.FuncMap) {
    	funcs = template.FuncMap{
    		"isSet":          fr.isSet,
    		"defaultOrValue": fr.defaultOrValue,
    		"inFormat":       fr.inFormat,
    		"unEscape":       fr.unEscape,
    		"split":          fr.split,
    		"limitOffset":    fr.limitOffset,
    	}
    	return
    }

最后就是根据参数和模板相结合,输出完整的SQL。

[go]
funcs := &template.FuncRegistry{TemplateData: templateData}
tpl := gotemplate.New(tplName).Funcs(funcs.RegistryAllFuncs())

// 解析 SQL 模板文件
tpl, err = tpl.ParseFiles(scriptPath)
if err != nil {
	err = fmt.Errorf("could not parse file %s: %v", scriptPath, err)
	return
}

// 执行模板文件
var buff bytes.Buffer
err = tpl.Execute(&buff, funcs.TemplateData)
if err != nil {
	err = fmt.Errorf("could not execute template %v", err)
	return
}

// 返回解析后的 SQL
sqlQuery = buff.String()

SQL执行

最后的SQL执行会其他无异,需要注意的是会根据http method做区分处理。因为对于select场景,只需要直接返回DB结果,而对于写入场景,需要获取 RowsAffected 值给到client端。

元数据拉取

pREST还提供几个元数据的拉取接口,包括:

  • /database :拉取所有DB列表。
  • /schemas :拉取schemas列表
  • /tables :拉取tables列表
  • /show/{DATABASE}/{SCHEMA}/{TABLE} :查看表包含字段详情

这几个接口的底层实现实际也是会转换为SQL查询,pREST没有维护自己的表结构,所以最终会转换为对pg的系统表查询,包括pg_database、information_schema.schemata、pg_catalog.pg_class、pg_catalog.pg_namespace 等。此处就不一一介绍了。

其他

一些在阅读源码前记录的问题,以及阅读完毕后的答案。

  1. 路由定义有些随意,容易误伤,感觉也缺乏规范。

    路由划分不是很清晰,restful的方式定义接口path上没有区分。比如查询表 /{DATABASE}/{SCHEMA} 、查询数据 /{DATABASE}/{SCHEMA}/{TABLE} 很容易弄混。应该按照directus的方式在一级path上就做好划分。

  2. 参数的定义方式比较潦草
    1. 所有查询参数通过get参数传递,这样就没办法定义复杂的形态,只能通过在value上做一些分隔符之类的手段,限制了灵活性。
    2. 参数基本与sql中的各个部分意义对应,感觉应该是自己拼接sql。但是也支持join

    CRUD的操作确实是手工拼接sql。从query、body中提取各个部分的参数,以select为例,包括

    • select field,包括原始字段 和 聚合函数(字段)另种模式
    • from table
    • join: join条件,包括 join table on a.field operator b.field
    • where: 各种field + operator + 数据组成的条件
    • group by & having :分组条件
    • order by:排序

    各个部分的sql构建是依赖自己的参数,最后拼接位完整的sql。所以也很难验证有相关性的参数场景。

  3. 没有自己的数据模型存储,这样就只能完全对齐pg的特性。

    是的,元数据也是直接从db系统表(pg_database、information_schema.schemata、pg_catalog.pg_class、pg_catalog.pg_namespace等)获取。优点是简单,缺点是校验能力较低。

  4. 只能构建 and 条件查询?

    是,参数方式限制,没办法做到像directus那样支持逻辑运算

  5. 为何adaper执行SQL一定要使用pg的json_agg、jsonb_agg来输出json数据?golang中不能自己输出自己的json吗?

    可以自己输出。比如对count_first的处理,就是scan到struct中后再转换为json。一定是需要struct定义吗?

总结

pREST作为一个为postgres提供REST接口的服务,整体实现逻辑比较清晰,功能也很完善,简单的CRUD可以直接通过其API实现,复杂需求也可以通过自定义SQL实现,基本可以覆盖各类业务场景。但是相对于directus等项目,没有系统配置&前端配置等基础能力,其校验能力相对比较匮乏,依赖业务的自行保障或者依赖DB的报错信息。所以对于稍微复杂的写入操作(比如连表的写入),也需要通过自定义SQL实现。

 

See all postsSee all posts