全局 Lua 插件

我们支持用户编写 Lua 插件来执行一些自定义功能,可以选择定时触发或者事件触发。

例如,我们可以每分钟从数据库中查询 CPU 使用率较高的节点,并通过 HTTP 请求将其报告给用户自己的监控系统。

下面是一个关于如何创建 Lua 插件的实际例子。

Screenshot

我们用下面的代码创建一个定时触发的插件:


local sql = [[
select node_id, avg("system_CPU_percent") from monitor
where created > now() - INTERVAL '1 hour' group by node_id limit 1
]]
local res = sql_query(sql, 120, 2000, "log_server")

local params = {
    body = res
}
res = http_query('POST', "http://receive-metrics.openresty.com", params)

output(res)

我们点击这个执行按钮可以立即看到执行结果。

Screenshot

可以在执行历史页面中看到每次插件执行的结果。

Screenshot

Screenshot

此外,Lua 插件也可以由事件触发。当指定的事件发生时,它将触发插件运行,并通过 Lua 变量 trigger_event 传递给插件。

下面是一个打印触发事件的例子。

output(trigger_event)

Screenshot

点击执行按钮会触发一个 node 下线的测试事件。

Screenshot

事件

我们现在支持以下这些事件:

Nodes Heartbeat Offline

节点没有发送心跳到 Admin,被设置成离线时触发 nodes_heartbeat_offline 事件。

{
    "type": "nodes_heartbeat_offline",
    "from": "log-server",
    "message": "Gateway nodes [59] offline",
    "level": "ERROR"
}

Nodes Heartbeat Online

节点发送心跳到 Admin,被设置成在线时触发 nodes_heartbeat_online 事件。

{
    "type": "nodes_heartbeat_online",
    "from": "log-server",
    "message": "Gateway nodes [59] online",
    "level": "WARNING"
}

Nodes Offline

节点健康检查失败,被设置成下线时触发 nodes_offline 事件。

{
    "from": "admin",
    "level": "ERROR",
    "message": "gateway node [78] is offline since failed to connect to 120.24.93.4:81: connection refused, time: 1634629224;;failed to connect to 120.24.93.4:81: connection refused, time: 1634629254;;failed to connect to 120.24.93.4:81: connection refused, time: 1634629284",
    "type": "node_offline"
}

Nodes Online

节点健康检查成功,被设置成上线时触发 nodes_online 事件。

{
    "from": "admin",
    "level": "WARNING",
    "message": "gateway node [78] is online since health check success",
    "type": "node_online"
}

Release

发布应用时触发 release 事件。

{
    "type": "release",
    "uid": 2,
    "http_app_id": 786
}

WAF Event

当请求命中 WAF 规则时,并且分数达到阈值,会触发 waf_event 事件。

相同客户端 IP 的请求一秒内只会触发一次事件

{
    "app_id": "1033",
    "type": "waf_event",
    "score": "3",
    "threshold": "3",
    "action": "block",
    "matches": [
        {
            "matches": [
                "0",
                "/hit"
            ],
            "begin_line": 1,
            "version": "d04fb751526fc85a172475a71f19cf53",
            "rule_id": "0",
            "rule_set_id": 10025,
            "msg": "test",
            "group": "test",
            "end_line": 2
        }
    ],
    "header": "User-Agent: curl/7.29.0\r\nHost: waf-filter.com\r\nAccept: */*\r\nProxy-Connection: Keep-Alive\r\n\r\n",
    "timestamp": "1634629481",
    "request_id": "0000270010243927eb48000d",
    "client_country": "",
    "client_province": "unknown",
    "client_city": "unknown",
    "from": "log_server",
    "client_isp": "",
    "body": "",
    "remote_addr": "172.17.0.1",
    "host": "waf-filter.com",
    "request": "GET HTTP://waf-filter.com/hit HTTP/1.1"
}

Builtin API

大部分 Lua 代码是可以在插件中运行的,我们也提供了一些内置的 API。

output

syntax: output(msg)

将消息输出到执行历史中。

http_query

syntax: res = http_query(method, url, params, retries)

发送 HTTP 请求。

  • method HTTP 请求方法,比如 GET, POST, PUT
  • url HTTP 请求 url 字符串
  • retries 请求失败重试次数,默认是 1

params 支持这些字段:

  • timeout 设置请求超时时间,单位是毫秒,默认超时时间为 300 秒
  • headers 设置请求头
  • body 设置请求体
  • ssl_verify 启用 SSL 证书验证,默认关闭

当请求成功,会返回一个 Lua Table,会包含以下这些字段:

  • status 响应状态码
  • headers 响应头
  • body 响应体

sql_query

查询 Admin 或者 Log Server 的数据库。

syntax: res = sql_query(sql, timeout, limit, destination, retries)

  • sql 查询 SQL 语句,只支持 select 类型的语句
  • timeout 设置查询超时时间,单位是秒,默认超时时间为 120 秒
  • limit 设置结果限制数,默认返回 20000 条查询结果
  • destination 查询目标,选择 admin 或者 log-server
  • retries 查询失败重试次数,默认是 1

send_alarm_event

发送自定义报警事件

syntax: res = send_alarm_event(alarm_type, alarm_level, alarm_message)

  • alarm_type 自定义报警类型
  • alarm_level 报警等级,有三个等级:CRITICAL, ERROR,WARNING
  • alarm_message 报警文本内容

更多示例

添加被 WAF 规则拦截的客户端 IP 到应用 IP 列表中

这里我们需要先在应用内使用 WAFIP List.

-- In this example we assume that the application id and IP list id are both 1.
local app_id = "1"
local ip_list_id = "1"

local str_fmt = string.format
local api_put = require "Lua.SchemaDB" .update

-- Get the IP addresses blocked by WAF in the last 24 hours
local sql = [[
SELECT DISTINCT remote_addr as ip
FROM waf_request_tsdb
WHERE action='block'
        AND score >= threshold
        AND created >= now() - interval '24 hours'
        AND app_id='%s'
]]

sql = str_fmt(sql, app_id)
local res = sql_query(sql, 120, 2000, "log_server")
local ip_list = { items = res }

local uri = { "applications", app_id, "ip_list", ip_list_id }
res, err = api_put(uri, ip_list)

if res then
    output("updated ip blacklist successfully!")
else
    output("failed to update ip blacklist: " .. tostring(err))
end

校验指定 APP_ID 应用的 HTTPS 证书

-- 请手动更新这一行
local app_id = nil

-- 移除下面一行注释,如果你想在触发事件中使用
-- local app_id = trigger_event.http_app_id

if not app_id then
    return output("WARN: app_id is required")
end

local ngx = ngx
local substr = string.sub
local str_fmt = string.format
local re_find = ngx.re.find
local httpc = require "resty.http".new()

local function ssl_handshake(ip, port, domain)
    local c, err = httpc:connect(ip, port)
    if not c then
        return nil, err
    end

    return httpc:ssl_handshake(nil, domain, true)
end

local function is_wildcard_domain(domain)
    if string.sub(domain, 1, 2) == '*.' then
        return true
    end

    return false
end

local app_domains_sql = [[
select applications_domains.domain "domain",
    applications_domains.is_wildcard is_wildcard,
    https_ports,
    offline_enabled
    from applications
left join applications_domains on applications.id = applications_domains._applications_id
where applications.id = %d
]]

local cert_domains_sql = [[
select applications_phases_ssl_cert_certs_acme_host.item acme_host
    from applications_phases_ssl_cert_certs
join applications_phases_ssl_cert_certs_acme_host on
    applications_phases_ssl_cert_certs_acme_host._applications_phases_ssl_cert_certs_id
    = applications_phases_ssl_cert_certs.id
where global_cert is null and applications_phases_ssl_cert_certs._applications_id = %d
union
select global_certs.acme_host acme_host
    from applications_phases_ssl_cert_certs
join global_certs on applications_phases_ssl_cert_certs.global_cert = global_certs.id
where global_cert is not null and applications_phases_ssl_cert_certs._applications_id = %d
]]

local gateway_nodes_sql = [[
select gateway_nodes.external_ip external_ip,
    gateway_nodes.external_ipv6 external_ipv6
    from applications
left join applications_partitions on applications.id = applications_partitions._applications_id
left join gateway on applications_partitions.item = gateway.partition
left join gateway_nodes on gateway.id = gateway_nodes._gateway_id
where offline_enabled is not true
    and (gateway_nodes.external_ip is not null or gateway_nodes.external_ipv6 is not null)
    and applications.id = %d
]]

local ok_tbl = {}
local err_tbl = {}
local check_list = {}
local app_domains_hash = {}

local app_domains, err = sql_query(str_fmt(app_domains_sql, tonumber(app_id)))
if not app_domains then
    output(str_fmt("failed to verify TLS certificate for app, app_id: %d"), tostring(app_id))
end

local domain_list = {}
for _, app_domain in ipairs(app_domains) do
    domain_list[#domain_list + 1] = app_domain.domain

    app_domains_hash[app_domain.domain] = app_domain
end

local domain_str = table.concat(domain_list, ', ')

local cert_domains, err = sql_query(str_fmt(cert_domains_sql, tonumber(app_id), tonumber(app_id)))
if not cert_domains then
    output("failed to verify TLS certificate for app, domain: %s, err: no certificate found", domain_str)
end

local gateway_nodes, err = sql_query(str_fmt(gateway_nodes_sql, tonumber(app_id)))
if not gateway_nodes then
    output("failed to verify TLS certificate for app, domain: %s, err: no gateway nodes found", domain_str)
end

for _, cert_domain in ipairs(cert_domains) do
    local acme_host = cert_domain.acme_host

    if is_wildcard_domain(acme_host) and app_domains_hash[acme_host] then
        err_tbl[#err_tbl + 1] ="wildcard app with wildcard cert is not supported yet:" .. tostring(acme_host)
        goto _next_
    end

    if is_wildcard_domain(acme_host) then
        for _, app_domain in ipairs(app_domains) do
            local domain = app_domain.domain
            local base_acme_host = substr(acme_host, 2)

            if re_find(domain, [[\A(?:\Q\E.*?\Q]] .. base_acme_host .. [[\E)]], 'josm') then
                check_list[#check_list + 1] = app_domains_hash[app_domain]
            end
        end

        goto _next_
    end

    local hit = false

    for _, app_domain in ipairs(app_domains) do
        local domain = app_domain.domain

        if domain == acme_host then
            check_list[#check_list + 1] = app_domains_hash[acme_host]

            goto _next_
        end

        if is_wildcard_domain(domain) then
            local base_domain = substr(domain, 2)

            if re_find(acme_host, [[\A(?:\Q\E.*?\Q]] .. base_domain .. [[\E)]], 'josm') then
                local app_obj = app_domains_hash[domain]

                check_list[#check_list + 1] = {
                    domain = acme_host,
                    is_wildcard = app_obj.is_wildcard,
                    https_ports = app_obj.https_ports
                }
                hit = true
            end
        end
    end

    if hit then
        goto _next_
    end

    err_tbl[#err_tbl + 1] = str_fmt(
        "certificate with Common Name '%s' not found matched host in current application '%s'",
        acme_host, domain_str)

    ::_next_::
end

local check_domain_list = {}

for _, check_obj in pairs(check_list) do
    local check_domain = check_obj.domain
    local https_ports = check_obj.https_ports

    check_domain_list[#check_domain_list + 1] = check_domain

    for _, gateway_node in ipairs(gateway_nodes) do
        local ip = gateway_node.external_ip or gateway_node.external_ipv6
        if not ip then
            goto _next_node_
        end

        for _, https_port in ipairs(https_ports) do
            local ok, err = ssl_handshake(ip, https_port, check_domain)
            if not ok then
                err_tbl[#err_tbl + 1] = str_fmt("domain '%s' on ip '%s': '%s'",
                    check_domain, ip, err)
                goto _next_node_
            end

            ok_tbl[#ok_tbl + 1] = str_fmt("domain '%s' on ip '%s'", check_domain, ip)
        end

        ::_next_node_::
    end
end

if #err_tbl == 0 then
    output("OK: all TLS certificates are verified:" .. table.concat(check_domain_list, ','))

else
    output("ERR: following TLS certificates are failed:" .. table.concat(err_tbl, "\n === \n"))

    if #ok_tbl > 0 then
        output("INFO: following TLS certificates are verified:" .. table.concat(ok_tbl, "\n === \n"))

    else
        output("ERR: no TLS certificate is verified successfully")
    end
end