openresty+Lua 验签、调用限流、埋点实战
功能描述
1、对用户的请求进行签名验证,放行合法请求 2、所有请求里,根据appkey查询uid,将uid带给后端使用,降低后端查库压力 3、 可以针对每个appkey、每个接口进行调用次数限制
实现逻辑
# 前置:我们已经将MySQL的某个用户表直接实时同步到了redis中使用hash存储 1、提取uri中的appkey参数 2、利用appkey查询Redis中存储的uid,放入header头。如果查询不到uid直接返回错误 3、利用appkey查询Redis中存储的的secret生成签名,对比用户的签名,完成签名认证,签名通过则放行整个到后端请求 4、 利用redis的数值加钱功能,完成调用次数加减 5、解析请求,整理埋点数据发送到kafka,用于大数据离线分析 生产环境使用使用结果: 1G内存redis主从版qps就能轻松破万, 万级qps下cpu使用率20%左右
vhosts文件引入lua
server {
server_name openapiv2.itgod.org;
.. 省略
# rewrite阶段引入lua,获取用户UID
rewrite_by_lua_file conf/lua/scripts/openapiv2_uid.lua;
# location 匹配需要验证签名的接口
location ~ ^/(open-api/a|open-api/b){
# access阶段引入lua验证签名
access_by_lua_file conf/lua/scripts/openapiv2.lua;
proxy_pass http://$openapipassto;
}
.. 省略
openapiv2_uid.lua文件(功能: 查询UID,整理请求为埋点信息推送到kafka)
local cjson = require "cjson"
local ngx_log = ngx.log
local ngx_err = ngx.ERR
local redis = require "resty.redis"
local uri = ngx.var.uri
local red = redis:new()
local args = ngx.req.get_uri_args()
local ngx_say = ngx.say
local appkey = args["appkey"]
local mdw_prefix = "mysql_to_redis_node1:dtk_users_application"
if not appkey then
appkey = args["appKey"]
end
if not appkey then
ngx.exit(0)
-- 添加默认key
--local cdnignore = ngx.req.get_headers(1)
--if cdnignore then
--ngx_log(ngx_err, "=====" .. cdnignore["cdn-ignore"])
--ngx_log(ngx_err, "=====wu")
--args["appKey"] = '61a622c6501'
--ngx.req.set_uri_args(args)
--args = ngx.req.get_uri_args()
--appkey = args["appKey"]
--end
end
local mess_return = function(mess, s)
if s == nil then
ngx.status = 200
else
ngx.status = s
end
ngx_say(cjson.encode(mess))
local r, err = red:set_keepalive(100000, 50)
ngx.exit(0)
end
local find_uid = function()
local res, err = red:hmget(mdw_prefix .. ':' .. appkey, "uid")
if res then
local r, err = red:set_keepalive(100000, 50)
return res
else
local mess = {}
mess["message"] = "appkey不存在"
mess_return(mess, 439)
end
end
ngx.header['Content-Type'] = 'application/json; charset=utf-8'
-- redis初始
red:set_timeout(100)
local red_option = {}
red_option["pool_size"] = 50
red_option["backlog"] = 100
local ok, err = red:connect("xxxxx.redis.rds.aliyuncs.com", 6379, red_option)
if not ok then
ngx.exit(0)
end
-- uid
local uid = find_uid()
ngx.req.set_header("uid", uid)
ngx.header["uid"] = uid
---BI
local producer = require "resty.kafka.producer"
local cjson = require "cjson"
local uri = ngx.var.request_uri
local x_forwarded_for = ngx.var.http_x_forwarded_for
--local user_agent = ngx.var.http_user_agent
local biToKafka = function()
local message = {}
local producer = require "resty.kafka.producer"
local cjson = require "cjson"
local uri = ngx.var.request_uri
local x_forwarded_for = ngx.var.http_x_forwarded_for
local request_method = ngx.var.request_method
local message = {}
local broker_list = {
{ host = "192.168.13.45", port = 9092 },
{ host = "192.168.13.46", port = 9092 },
{ host = "192.168.13.47", port = 9092 }
}
local params = {}
message["apiName"] = ngx.var.uri
message["appKey"] = args["appkey"]
message["flag"] = 0
message["ip"] = ngx.var.http_x_forwarded_for
message["link"] = ngx.var.request_uri
for k, v in pairs(args) do
params[k] = v
end
message["requestTime"] = os.date("%Y-%m-%d %H:%M:%S", os.time()) .. ".000"
message["params"] = params
message = cjson.encode(message)
--ngx_log(ngx_err, message)
local bp = producer:new(broker_list, { producer_type = "async" })
bp:send("test", null, message)
end
biToKafka()
openapiv2.lua文件(功能:查询secret,并验证签名和调用次数限制)
local cjson = require "cjson"
local ngx_log = ngx.log
local ngx_err = ngx.ERR
local redis = require "resty.redis"
local uri = ngx.var.uri
local red = redis:new()
local args = ngx.req.get_uri_args()
local ngx_say = ngx.say
local timer = args["timer"]
local nonce = args["nonce"]
local appkey = args["appkey"]
local keywords = {"总", "接口"}
local mdw_prefix = "mysql_to_redis_node1:dtk_users_application"
local ill_count = 1000000000
local today_sub = ngx.today()
ngx.header['Content-Type'] = 'application/json; charset=utf-8'
local mess_return = function(mess, s)
if s == nil then
ngx.status = 200
else
ngx.status = s
end
ngx_say(cjson.encode(mess))
local r, err = red:set_keepalive(100000, 100)
ngx.exit(0)
end
if not appkey then
appkey = args["appKey"]
end
--if not appkey or not timer or not nonce then
if not appkey then
local mess = {}
mess["message"] = "appkey必要参数缺失,请检查"
mess_return(mess, 200)
end
local redis_key = mdw_prefix .. ":" .. appkey .. ":" .. "Nginx:" .. today_sub
local find_appsecret = function()
--#local res, err = red:hmget(mdw_prefix .. ':' .. appkey, "app_secret", "uid")
local res, err = red:hget(mdw_prefix .. ':' .. appkey, "app_secret")
if res then
return res
else
return
end
end
local appkey_decr_hset = function(field)
local res, err = red:hincrby(redis_key, field, -1)
if not res then
return
end
return res
end
local gen_md5token = function(appsecret)
if appkey and timer and nonce and appsecret then
local osTimer = os.time() * 1000 - 600000
if tonumber(timer) < osTimer then
mess = {}
mess["message"] = "signRan已超时,请重新生成签名后再尝试请求"
mess_return(mess, 479)
end
local authurl = "appKey=" .. appkey .. "&timer=" .. timer .. "&nonce=" .. nonce .. "&key=" .. appsecret
local authmd5 = string.upper(ngx.md5(authurl))
return authmd5
else
return "dtksignfalsedefaultvalue.SRE"
end
end
local gen_md5token_old = function(appsecret)
if appkey and appsecret then
local s=''
local key_ascii_list = {}
for k,v in pairs(args) do
table.insert(key_ascii_list,k)
end
table.sort(key_ascii_list)
for k in pairs(key_ascii_list) do
local k_name = key_ascii_list[k]
local k_value = args[k_name]
--if k_value ~= "" and k_name ~= "sign" then
if k_name ~= "sign" then
s = s .. key_ascii_list[k] .. "=" ..args[key_ascii_list[k]] .. "&"
end
end
s = s .. "key=" .. appsecret
local authmd5_old = string.upper(ngx.md5(s))
return authmd5_old
else
return "dtksignfalsedefaultvalue.SRE"
end
end
local hget_key = function(field)
local mess = {}
local r, err = red:EXISTS(mdw_prefix .. ":" .. appkey)
if r == 0 then
mess["message"] = appkey .. "不存在,请稍后再试或联系客服"
mess_return(mess, 539)
end
local r, err = red:hexists(mdw_prefix .. ":" .. appkey, field)
if r == 0 then
mess["message"] = appkey .. "字段不存在" .. field
mess_return(mess, 539)
end
local r, err = red:hget(mdw_prefix .. ":" .. appkey, field)
if r then
red:hset(redis_key, field, r + ill_count)
red:expire(redis_key, 108000)
local r, err = red:set_keepalive(100000, 100)
else
mess["message"] = appkey .. "提取数据失败,请稍后再试或联系客服"
mess_return(mess, 549)
end
end
-- redis初始
red:set_timeout(100)
local red_option = {}
red_option["pool_size"] = 50
red_option["backlog"] = 100
local ok, err = red:connect("r-2zejf1hgj2eej5br0x.redis.rds.aliyuncs.com", 6379, red_option)
if not ok then
ngx.exit(0)
end
-- 验签
local sign_status = "no"
local appsecret = find_appsecret()
local signRan = args["signRan"]
if signRan then
local authmd5 = gen_md5token(appsecret)
--ngx.req.set_header("uid", appsecret_uid[2])
-- ngx.log(ngx.err, "--------" .. authmd5)
if signRan == authmd5 then
sign_status = "yes"
end
end
if not signRan then
local sign = args["sign"]
-- old
local authmd5_old = gen_md5token_old(appsecret)
--ngx_log(ngx_err, authmd5_old)
--ngx_log(ngx_err, sign)
if sign == authmd5_old then
sign_status = "yes"
end
end
if sign_status ~= "yes" then
local mess = {}
mess["message"] = "签名验证失败"
mess_return(mess, 479)
end
ngx.header["check_sign"] = "yes"
-- 查询次数组装数据接口 for kv
if uri == "/query" then
local mess = {}
local rk = "nginx:" .. appkey .. "*"
local r = red:keys(rk)
red:init_pipeline()
for i =1 , #r do
red:get(r[i])
end
local results, err = red:commit_pipeline()
if not results then
ngx.say("failed to commit the pipelined requests: ", err)
mess["message"] = "查询失败,请重试或联系客服"
else
for i = 1, #r do
local n = split(r[i], ':')
mess[table.concat(n,":",2)] = results[i]
end
end
mess_return(mess)
end
-- 新查询次数组装数据接口 for hset
if uri == "/query_new" then
local mess = {}
local r = red:hgetall(redis_key)
local rd = red:array_to_hash(r)
for k, v in pairs(rd) do
mess[k] = v
end
mess_return(mess)
end
-- 新增次数接口
if uri == "/add_total" then
local u = args["field"]
local t = args["total"]
local r, err = red:hset(redis_key, u, t)
if r then
local mess = {}
mess["message"] = "重置次数成功"
mess_return(mess)
end
ngx.exit(0)
end
local mess = {}
local total_keydecr_res = appkey_decr_hset("total_num")
if total_keydecr_res < 0 then
hget_key("total_num")
elseif total_keydecr_res < ill_count then
--use_local_cache(redis_key_total, total_keydecr_res, keywords[1])
mess["message"] = "appkey的总调用次数已经耗尽"
mess_return(mess, 439)
end
-- 接口次数限制功能已经调试完成,暂时注释。后端开发完成后可以直接启用
-- local keydecr_res = appkey_decr_hset(uri)
-- if keydecr_res < 0 then
-- hget_key(uri)
-- elseif keydecr_res < ill_count then
-- --use_local_cache(redis_key, keydecr_res, keywords[2])
-- mess["message"] = "该接口调用次数已经耗尽"
-- mess_return(mess, 449)
-- end
--local mess = {}
---- 调试
-- local authmd5=12
-- local timer = 11
-- --mess["massage"] = "调用成功res:" .. keydecr_res .. "总" .. total_keydecr_res .. " key_name:" .. redis_key .. timer .. " " .. authmd5
-- mess["massage"] = "调用成功总:" .. total_keydecr_res .. " key_name:" .. redis_key
-- mess_return(mess)