kafka-logger
#
描述kafka-logger
插件用于将日志作为 JSON 对象推送到 Apache Kafka 集群中。可用作 ngx_lua
NGINX 模块的 Kafka 客户端驱动程序。
#
属性名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 |
---|---|---|---|---|---|
broker_list | object | 是 | 已废弃,现使用 brokers 属性代替。原指需要推送的 Kafka 的 broker 列表。 | ||
brokers | array | 是 | 需要推送的 Kafka 的 broker 列表。 | ||
brokers.host | string | 是 | Kafka broker 的节点 host 配置,例如 192.168.1.1 | ||
brokers.port | string | 是 | Kafka broker 的节点端口配置 | ||
brokers.sasl_config | object | 否 | Kafka broker 中的 sasl_config | ||
brokers.sasl_config.mechanism | string | 否 | "PLAIN" | ["PLAIN"] | Kafka broker 中的 sasl 认证机制 |
brokers.sasl_config.user | string | 是 | Kafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写 | ||
brokers.sasl_config.password | string | 是 | Kafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写 | ||
kafka_topic | string | 是 | 需要推送的 topic。 | ||
producer_type | string | 否 | async | ["async", "sync"] | 生产者发送消息的模式。 |
required_acks | integer | 否 | 1 | [1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka acks 属性相同,具体配置请参考 Apache Kafka 文档。required_acks 还不支持为 0。 |
key | string | 否 | 用于消息分区而分配的密钥。 | ||
timeout | integer | 否 | 3 | [1,...] | 发送数据的超时时间。 |
name | string | 否 | "kafka logger" | 标识 logger 的唯一标识符。如果您使用 Prometheus 监视 APISIX 指标,名称将以 apisix_batch_process_entries 导出。 | |
meta_format | enum | 否 | "default" | ["default","origin"] | default :获取请求信息以默认的 JSON 编码方式。origin :获取请求信息以 HTTP 原始请求方式。更多信息,请参考 meta_format。 |
log_format | object | 否 | 以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 $ 开头,则表明是要获取 APISIX 变量 或 NGINX 内置变量。 | ||
include_req_body | boolean | 否 | false | [false, true] | 当设置为 true 时,包含请求体。注意:如果请求体无法完全存放在内存中,由于 NGINX 的限制,APISIX 无法将它记录下来。 |
include_req_body_expr | array | 否 | 当 include_req_body 属性设置为 true 时进行过滤。只有当此处设置的表达式计算结果为 true 时,才会记录请求体。更多信息,请参考 lua-resty-expr。 | ||
max_req_body_bytes | integer | 否 | 524288 | >=1 | 允许的最大请求正文(以字节为单位)。在此限制内的请求体将被推送到 Kafka。如果大小超过配置值,则正文在推送到 Kafka 之前将被截断。 |
include_resp_body | boolean | 否 | false | [false, true] | 当设置为 true 时,包含响应体。 |
include_resp_body_expr | array | 否 | 当 include_resp_body 属性设置为 true 时进行过滤。只有当此处设置的表达式计算结果为 true 时才会记录响应体。更多信息,请参考 lua-resty-expr。 | ||
max_resp_body_bytes | integer | 否 | 524288 | >=1 | 允许的最大响应正文(以字节为单位)。低于此限制的响应主体将被推送到 Kafka。如果大小超过配置值,则正文在推送到 Kafka 之前将被截断。 |
cluster_name | integer | 否 | 1 | [0,...] | Kafka 集群的名称,当有两个及以上 Kafka 集群时使用。只有当 producer_type 设为 async 模式时才可以使用该属性。 |
producer_batch_num | integer | 否 | 200 | [1,...] | 对应 lua-resty-kafka 中的 batch_num 参数,聚合消息批量提交,单位为消息条数。 |
producer_batch_size | integer | 否 | 1048576 | [0,...] | 对应 lua-resty-kafka 中的 batch_size 参数,单位为字节。 |
producer_max_buffering | integer | 否 | 50000 | [1,...] | 对应 lua-resty-kafka 中的 max_buffering 参数,表示最大缓冲区,单位为条。 |
producer_time_linger | integer | 否 | 1 | [1,...] | 对应 lua-resty-kafka 中的 flush_time 参数,单位为秒。 |
meta_refresh_interval | integer | 否 | 30 | [1,...] | 对应 lua-resty-kafka 中的 refresh_interval 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。 |
该插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 5
秒钟或队列中的数据达到 1000
条时提交数据,如需了解批处理器相关参数设置,请参考 Batch-Processor 配置部分。
提示
数据首先写入缓冲区。当缓冲区超过 batch_max_size
或 buffer_duration
设置的值时,则会将数据发送到 Kafka 服务器并刷新缓冲区。
如果发送成功,则返回 true
。如果出现错误,则返回 nil
,并带有描述错误的字符串 buffer overflow
。
#
meta_format 示例default
:{
"upstream": "127.0.0.1:1980",
"start_time": 1619414294760,
"client_ip": "127.0.0.1",
"service_id": "",
"route_id": "1",
"request": {
"querystring": {
"ab": "cd"
},
"size": 90,
"uri": "/hello?ab=cd",
"url": "http://localhost:1984/hello?ab=cd",
"headers": {
"host": "localhost",
"content-length": "6",
"connection": "close"
},
"body": "abcdef",
"method": "GET"
},
"response": {
"headers": {
"connection": "close",
"content-type": "text/plain; charset=utf-8",
"date": "Mon, 26 Apr 2021 05:18:14 GMT",
"server": "APISIX/2.5",
"transfer-encoding": "chunked"
},
"size": 190,
"status": 200
},
"server": {
"hostname": "localhost",
"version": "2.5"
},
"latency": 0
}origin
:GET /hello?ab=cd HTTP/1.1
host: localhost
content-length: 6
connection: close
abcdef
#
插件元数据名称 | 类型 | 必选项 | 默认值 | 描述 |
---|---|---|---|---|
log_format | object | 否 | 以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 $ 开头,则表明是要获取 APISIX 变量 或 NGINX 内置变量。 |
注意
该设置全局生效。如果指定了 log_format
,则所有绑定 kafka-logger
的路由或服务都将使用该日志格式。
以下示例展示了如何通过 Admin API 配置插件元数据:
note
您可以这样从 config.yaml
中获取 admin_key
并存入环境变量:
admin_key=$(yq '.deployment.admin.admin_key[0].key' conf/config.yaml | sed 's/"//g')
curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/kafka-logger \
-H "X-API-KEY: $admin_key" -X PUT -d '
{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr"
}
}'
配置完成后,你将在日志系统中看到如下类似日志:
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
#
如何启用你可以通过如下命令在指定路由上启用 kafka-logger
插件:
curl http://127.0.0.1:9180/apisix/admin/routes/1 \
-H "X-API-KEY: $admin_key" -X PUT -d '
{
"plugins": {
"kafka-logger": {
"brokers" : [
{
"host": "127.0.0.1",
"port": 9092
}
],
"kafka_topic" : "test2",
"key" : "key1"
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}'
该插件还支持一次推送到多个 Broker,示例如下:
"brokers" : [
{
"host" :"127.0.0.1",
"port" : 9092
},
{
"host" :"127.0.0.1",
"port" : 9093
}
],
#
测试插件你可以通过以下命令向 APISIX 发出请求:
curl -i http://127.0.0.1:9080/hello
#
删除插件当你需要删除该插件时,可以通过如下命令删除相应的 JSON 配置,APISIX 将会自动重新加载相关配置,无需重启服务:
curl http://127.0.0.1:9180/apisix/admin/routes/1 \
-H "X-API-KEY: $admin_key" -X PUT -d '
{
"methods": ["GET"],
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'