lua-resty-kafka-fast
Name
lua-resty-kafka-fast - Lua Kafka client driver for OpenResty.
Table of Contents
Status
This library is considered production-ready.
Description
This Lua library is a Kafka client driver for working with the ngx_lua nginx module:
https://github.com/openresty/lua-nginx-module/#readme
Synopsis
load_module "/usr/local/openresty/nginx/modules/ngx_http_lua_kafka_module.so";
thread_pool lua_kafka threads=16;
http {
lua_kafka_max_clients 16;
# you do not need the following line if you are using
# the OpenResty bundle:
lua_package_path "/usr/local/openresty/site/lualib/?.lua;;";
init_worker_by_lua_block {
local is_kafka_client_running = false
local function kafka_consumer_imp()
local kafka = require "resty.kafka.fast"
local topics = {{topic = "topic1"}, {topic = "topic2"}}
local consumer, err = kafka.new_consumer("127.0.0.1:9094,127.0.0.2:9094",
{["auto.offset.reset"] = "beginning",
["broker.address.family"]="v4"},
"group-name", topics)
if not consumer then
ngx.log(ngx.ERR, "create kafka consumer failed: ", err)
return
else
ngx.log(ngx.INFO, "create kafka consumer success")
end
-- consume 100 message
for i = 1, 100 do
if ngx.worker.exiting() then
break
end
local msg, err = consumer:read()
if not msg then
-- 'read timeout' is not a fatal error.
-- just because there are not message from kafka server
-- so you can skip the 'read timeout' error
if err ~= "read timeout" then
ngx.log(ngx.ERR, "read failed: ", err)
end
else
ngx.log(ngx.ERR, "topic: ", msg.topic or "",
", key: ", msg.key or "",
", payload: ", msg.payload or "",
", offset: ", msg.offset or "",
", partition: ", msg.partition or "")
end
end
consumer:close()
end
local function kafka_consumer(premature)
if premature or is_kafka_client_running then
return
end
is_kafka_client_running = true
local ok, err = pcall(kafka_consumer_imp)
if not ok then
ngx.log(ngx.ERR, err)
end
is_kafka_client_running = false
end
local function kafka_producer(premature)
local kafka = require "resty.kafka.fast"
local function msg_hdl(topic, key, message, partition, offset, err)
ngx.log(ngx.INFO, "topic: ", topic, ", key: ", key,
", message: ", #message, ", partition: ", partition,
", offset: ", offset, ", error: ", err)
end
local function error_handler(err)
ngx.log(ngx.ERR, "kafka producer error: ", err)
end
local p, err = kafka.new_producer("127.0.0.1:9094,127.0.0.2:9094",
{
producer_type = "async", -- default is "sync"
error_handler = error_handler, -- optional
message_handler = message_handler, -- optional
})
if p == nil then
ngx.log(ngx.ERR, "failed to create kafka producer: ", err)
return
end
local messages = {
{topic="test-1", key = "k1", message="abc"},
{topic="test-2", key = "k2", message="def"},
{topic="test-3", key = "k3", message="hig"},
}
local res, err = p:send_multi(messages)
if not res then
ngx.log(ngx.ERR, "failed to send kafka messages: ", err)
return
end
res, err = p:send("topic-name", "key", "value")
if not res then
ngx.log(ngx.ERR, "failed to send kafka messages: ", err)
return
end
end
local ok, err = ngx.timer.every(10, kafka_consumer)
if not ok then
ngx.log(ngx.ERR, "failed to set timer for kafka_consumer: ", err)
return
end
local ok, err = ngx.timer.every(10, kafka_producer)
if not ok then
ngx.log(ngx.ERR, "failed to set timer for kafka_producer: ", err)
return
end
}
}
Directives
lua_kafka_max_clients
syntax: lua_kafka_max_clients [num]
default: lua_kafka_max_clients 128
context: http
Specifies the maximum number of entries allowed in the worker process level for the kafka client.
Each client consumes one thread, so the number of clients should be the same as the value configured for the thread pool lua_kafka
.
Methods
new_consumer
syntax: consumer, err = kafka.new_consumer(brokers, opts, group_id, topics)
Creates a Kafka consumer object. In case of failures, returns nil
and a string describing the error.
The brokers
argument is a comma-separated list of hosts or hosts: ports (the default port is 9092).
lua-resty-kafka-fast will use the bootstrap brokers to acquire the complete set of brokers from the cluster.
The opts
argument is a Lua table holding the following keys:
auto.offset.reset
Action to take when there is no initial offset in the offset store or the desired offset is out of range: ‘smallest’,’earliest’ - automatically reset the offset to the smallest offset, ’largest’,’latest’ - automatically reset the offset to the largest offset, ’error’ - trigger an error which is retrieved by consuming messages and checking
err
.The values:
smallest
,earliest
,beginning
,largest
,latest
,end
,error
.Default values:
largest
broker.address.family
Allowed broker IP address families: any, v4, v6 The values:
any
v4
,v6
.Default value:
any
.enable.partition.eof
Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition.
The values:
true
,false
Default value:
true
security.protocol
Protocol used to communicate with brokers.
The values:
plaintext
,ssl
Default value:
plaintext
enable.ssl.certificate.verification
Enable OpenSSL’s builtin broker (server) certificate verification. The values can be
true
orfalse
. The default value istrue
.ssl.key.location
Path to client’s private key (PEM) used for authentication.
ssl.key.pem
Client’s private key string (PEM format) used for authentication.
ssl.key.password
Private key passphrase
ssl.certificate.location
Path to client’s public key (PEM) used for authentication.
ssl.certificate.pem
Client’s public key string (PEM format) used for authentication.
ssl.ca.location
File or directory path to CA certificate(s) for verifying the broker’s key.
ssl.ca.pem
CA certificate string (PEM format) for verifying the broker’s key.
ssl.cipher.suites
A cipher suite is a named combination of authentication, encryption, MAC, and key exchange algorithms that are used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for
ciphers(1)
and `SSL_CTX_set_cipher_list(3).ssl.crl.location
Path to CRL to verify the validity of the broker’s certificate.
socket.timeout.ms
Default timeout for network requests. Consumer: FetchRequests will use fetch.wait.max.ms + socket.timeout.ms.
The default value is 60000 ms.
socket.connection.setup.timeout.ms
The maximum time allowed for broker connection setup (TCP connection setup as well as SSL and SASL handshake) is 30 minutes. If the connection to the broker is not fully functional after this, it will be closed and retried.
The default value is 30000 ms.
fetch.wait.max.ms
Maximum time the broker may wait to fill the Fetch response with fetch.min.bytes of messages.
The default value is 500 ms.
The
group_id
argument is a Lua string holding the group ID of the consumer. This argument can be nil.The
topics
argument is a Lua table holding the following one or more topics. Each topic contains the following fileds:topic
The topic to be consumed. This field must be specified.
partition
The number of partitions to be consumed. This field is optional.
offset
The start offset of the topic to be consumed. This field is optional.
The values: kafka.OFFSET_BEGINNING, kafka.OFFSET_END, kafka.OFFSET_STORED
For example:
{ topic = "topic-name", partition = 0, offset = 1000 }
or
{ topic = "topic-name" }
consumer.read
syntax: msg, err = consumer:read()
Read a message from the Kafka server. In case of failures, returns nil
and a string describing the error.
Consumer errors are generally to be considered informational as the consumer library will automatically try to recover from all types of errors. To prevent Lua code from being blocked for long periods of time, a “read timeout” error message is returned if no message is received from the Kafka server for more than 1s.
Here are some error messages you might be interested in:
- “read timeout”: This error gives you the opportunity to exit the message handler function if you don’t receive a message from kafka for a long time.
- “Subscribed topic not available: topic-ts1-tc23: Broker: Unknown topic or partition”
- “Fetch from broker 101 reached end of partition at offset 4 (HighwaterMark 4)”
new_producer
syntax: consumer, err = kafka.new_producer(brokers, opts, cluster_name)
Creates a Kafka producer object. In case of failures, returns nil
and a string describing the error.
The brokers
argument is a comma-separated list of hosts or hosts: ports (the default port is 9092).
lua-resty-kafka-fast will use the bootstrap brokers to acquire the full set of brokers from the cluster.
The opts
argument is a Lua table holding the following keys:
producer_type
Specifies the producer.type. “async” or “sync”
broker.address.family
Allowed broker IP address families: any, v4, v6
The values:
any
v4
,v6
.Default value:
any
.security.protocol
Protocol used to communicate with brokers.
The values:
plaintext
,ssl
Default value:plaintext
enable.ssl.certificate.verification
Enable OpenSSL’s builtin broker (server) certificate verification. The values can be
true
orfalse
. The default value istrue
.ssl.key.location
Path to client’s private key (PEM) used for authentication.
ssl.key.pem
Client’s private key string (PEM format) used for authentication.
ssl.key.password
Private key passphrase
ssl.certificate.location
Path to client’s public key (PEM) used for authentication.
ssl.certificate.pem
Client’s public key string (PEM format) used for authentication.
ssl.ca.location
File or directory path to CA certificate(s) for verifying the broker’s key.
ssl.ca.pem
CA certificate string (PEM format) for verifying the broker’s key.
ssl.cipher.suites
A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for
ciphers(1)
and `SSL_CTX_set_cipher_list(3).ssl.crl.location
Path to CRL for verifying broker’s certificate validity.
socket.timeout.ms
Default timeout for network requests. Consumer: FetchRequests will use fetch.wait.max.ms + socket.timeout.ms.
The default value is 60000 ms.
socket.connection.setup.timeout.ms
Maximum time allowed for broker connection setup (TCP connection setup as well SSL and SASL handshake). If the connection to the broker is not fully functional after this, the connection will be closed and retried.
The default value is 30000 ms.
request.timeout.ms
The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on request.required.acks being != 0.
The default value is 30000 ms.
delivery.timeout.ms
Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time lua-resty-kafka-fast may use to deliver a message (including retries). The delivery error occurs when either the retry count or the message timeout is exceeded. The message timeout is automatically adjusted to transaction.timeout.ms if transactional.id is configured.
The default value is 300000 ms.
message_handler
Specifies the message handler, handle message when producer sent done. syntax: message_handler = function (topic, key, message, partition, offset, err) end. Whether this message was sent successfully or unsuccessfully, the message_handler function is executed.
error_handler
Specifies the error handler, handle error reported by the underlying driver. syntax: error_handler = function (err) end. You can log the error message for via ngx.log.
The third optional cluster_name
specifies the name of the cluster, default 1 (yeah, it’s number).
You can Specifies different names when you have two or more kafka clusters.
And this only works with async
producer_type.
producer.send
syntax: msg, err = producer:send(topic, key, message)
Produce and send a single message to broker.
The topic
argument is a Lua string
and is required
.
The key
argument is a Lua string
and is optional
.
The message
is a Lua string
ans is required
.
producer.send_multi
syntax: msg, err = producer:send(messages)
Produce and send multiple messages to broker.
The messages argument is a lua array
.
Each array member is a lua table
, which contains three fields: topic
, key
, and message
.
For example,
local messages = {
{topic="test-1", key = "k1", message="abc"},
{topic="test-2", key = "k2", message="def"},
{topic="test-3", key = "k3", message="hig"},
}
producer:send(messages)
describe_group
syntax: consumer, err = kafka.describe_group(brokers, opts, group_id, topics)
Fetch the offset information of the topic owned by the group.
The brokers
, opts
, group_id
arguments are just like the arguments in
kafka.new_consumer
.
The optional topics
argument is a Lua table. each element in topics
contains
two fields: topic
and partition
.
For example:
local msg, err = kafka.describe_group(
"127.0.0.1:9094",
{["broker.address.family"]="v4"},
"group-name")
local msg, err = kafka.describe_group(
"127.0.0.1:9094",
{["broker.address.family"]="v4"},
"group-name",
{{topic = "topic1", partition = 0}})
Copyright & Licenses
Copyright (C) 2024 OpenResty Inc. All rights reserved.
This software is proprietary and must not be redistributed or shared at all.