lua-resty-kafka-fast

Name

lua-resty-kafka-fast - Lua Kafka client driver for OpenResty.

Table of Contents

Status

This library is considered production-ready.

Back to TOC

Description

This Lua library is a Kafka client driver for working with the ngx_lua nginx module:

https://github.com/openresty/lua-nginx-module/#readme

Back to TOC

Synopsis

load_module "/usr/local/openresty/nginx/modules/ngx_http_lua_kafka_module.so";

thread_pool lua_kafka threads=16;

http {
    lua_kafka_max_clients 16;

    # you do not need the following line if you are using
    # the OpenResty bundle:
    lua_package_path "/usr/local/openresty/site/lualib/?.lua;;";

    init_worker_by_lua_block {
        local is_kafka_client_running = false

        local function kafka_consumer_imp()
            local kafka = require "resty.kafka.fast"
            local topics = {{topic = "topic1"}, {topic = "topic2"}}
            local consumer, err = kafka.new_consumer("127.0.0.1:9094,127.0.0.2:9094",
                                    {["auto.offset.reset"] = "beginning",
                                    ["broker.address.family"]="v4"},
                                    "group-name", topics)
            if not consumer then
                ngx.log(ngx.ERR, "create kafka consumer failed: ", err)
                return
            else
                ngx.log(ngx.INFO, "create kafka consumer success")
            end

            -- consume 100 message
            for i = 1, 100 do
                if ngx.worker.exiting() then
                    break
                end

                local msg, err = consumer:read()
                if not msg then
                    -- 'read timeout' is not a fatal error.
                    -- just because there are not message from kafka server
                    -- so you can skip the 'read timeout' error
                    if err ~= "read timeout" then
                        ngx.log(ngx.ERR, "read failed: ", err)
                    end
                else
                    ngx.log(ngx.ERR, "topic: ", msg.topic or "",
                            ", key: ", msg.key or "",
                            ", payload: ", msg.payload or  "",
                            ", offset: ", msg.offset or "",
                            ", partition: ", msg.partition or "")
                end
            end

            consumer:close()
        end

        local function kafka_consumer(premature)
            if premature or is_kafka_client_running then
                return
            end

            is_kafka_client_running = true

            local ok, err = pcall(kafka_consumer_imp)
            if not ok then
                ngx.log(ngx.ERR, err)
            end

            is_kafka_client_running = false
        end

        local function kafka_producer(premature)
            local kafka = require "resty.kafka.fast"

            local function msg_hdl(topic, key, message, partition, offset, err)
                ngx.log(ngx.INFO, "topic: ", topic, ", key: ", key,
                        ", message: ", #message, ", partition: ", partition,
                        ", offset: ", offset, ", error: ", err)
            end

            local function error_handler(err)
                ngx.log(ngx.ERR, "kafka producer error: ", err)
            end

            local p, err = kafka.new_producer("127.0.0.1:9094,127.0.0.2:9094",
                                              {
                                                 producer_type = "async", -- default is "sync"
                                                 error_handler = error_handler, -- optional
                                                 message_handler = message_handler, -- optional
                                              })
            if p == nil then
                ngx.log(ngx.ERR, "failed to create kafka producer: ", err)
                return
            end

            local messages = {
                                  {topic="test-1", key = "k1", message="abc"},
                                  {topic="test-2", key = "k2", message="def"},
                                  {topic="test-3", key = "k3", message="hig"},
                             }
            local res, err = p:send_multi(messages)
            if not res then
                ngx.log(ngx.ERR, "failed to send kafka messages: ", err)
                return
            end

            res, err = p:send("topic-name", "key", "value")
            if not res then
                ngx.log(ngx.ERR, "failed to send kafka messages: ", err)
                return
            end
        end

        local ok, err = ngx.timer.every(10, kafka_consumer)
        if not ok then
            ngx.log(ngx.ERR, "failed to set timer for kafka_consumer: ", err)
            return
        end

        local ok, err = ngx.timer.every(10, kafka_producer)
        if not ok then
            ngx.log(ngx.ERR, "failed to set timer for kafka_producer: ", err)
            return
        end
    }
}

Back to TOC

Directives

lua_kafka_max_clients

syntax: lua_kafka_max_clients [num]

default: lua_kafka_max_clients 128

context: http

Specifies the maximum number of entries allowed in the worker process level for the kafka client.

Each client consumes one thread, so the number of clients should be the same as the value configured for the thread pool lua_kafka.

Back to TOC

Methods

new_consumer

syntax: consumer, err = kafka.new_consumer(brokers, opts, group_id, topics)

Creates a Kafka consumer object. In case of failures, returns nil and a string describing the error.

The brokers argument is a comma-separated list of hosts or hosts: ports (the default port is 9092). lua-resty-kafka-fast will use the bootstrap brokers to acquire the complete set of brokers from the cluster.

The opts argument is a Lua table holding the following keys:

  • auto.offset.reset

    Action to take when there is no initial offset in the offset store or the desired offset is out of range: ‘smallest’,’earliest’ - automatically reset the offset to the smallest offset, ’largest’,’latest’ - automatically reset the offset to the largest offset, ’error’ - trigger an error which is retrieved by consuming messages and checking err.

    The values: smallest, earliest, beginning, largest, latest, end, error.

    Default values: largest

  • broker.address.family

    Allowed broker IP address families: any, v4, v6   The values: any v4, v6.

    Default value: any.

  • enable.partition.eof

    Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition.

    The values: true, false

    Default value: true

  • security.protocol

    Protocol used to communicate with brokers.

    The values: plaintext, ssl

    Default value: plaintext

  • enable.ssl.certificate.verification

    Enable OpenSSL’s builtin broker (server) certificate verification. The values can be true or false. The default value is true.

  • ssl.key.location

    Path to client’s private key (PEM) used for authentication.

  • ssl.key.pem

    Client’s private key string (PEM format) used for authentication.

  • ssl.key.password

    Private key passphrase

  • ssl.certificate.location

    Path to client’s public key (PEM) used for authentication.

  • ssl.certificate.pem

    Client’s public key string (PEM format) used for authentication.

  • ssl.ca.location

    File or directory path to CA certificate(s) for verifying the broker’s key.

  • ssl.ca.pem

    CA certificate string (PEM format) for verifying the broker’s key.

  • ssl.cipher.suites

    A cipher suite is a named combination of authentication, encryption, MAC, and key exchange algorithms that are used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for ciphers(1) and `SSL_CTX_set_cipher_list(3).

  • ssl.crl.location

    Path to CRL to verify the validity of the broker’s certificate.

  • socket.timeout.ms

    Default timeout for network requests. Consumer: FetchRequests will use fetch.wait.max.ms + socket.timeout.ms.

    The default value is 60000 ms.

  • socket.connection.setup.timeout.ms

    The maximum time allowed for broker connection setup (TCP connection setup as well as SSL and SASL handshake) is 30 minutes. If the connection to the broker is not fully functional after this, it will be closed and retried.

    The default value is 30000 ms.

  • fetch.wait.max.ms

    Maximum time the broker may wait to fill the Fetch response with fetch.min.bytes of messages.

    The default value is 500 ms.

    The group_id argument is a Lua string holding the group ID of the consumer. This argument can be nil.

    The topics argument is a Lua table holding the following one or more topics. Each topic contains the following fileds:

  • topic

    The topic to be consumed. This field must be specified.

  • partition

    The number of partitions to be consumed. This field is optional.

  • offset

    The start offset of the topic to be consumed. This field is optional.

    The values: kafka.OFFSET_BEGINNING, kafka.OFFSET_END, kafka.OFFSET_STORED

For example:

{ topic = "topic-name", partition = 0, offset = 1000 }

or

{ topic = "topic-name" }

Back to TOC

consumer.read

syntax: msg, err = consumer:read()

Read a message from the Kafka server. In case of failures, returns nil and a string describing the error.

Consumer errors are generally to be considered informational as the consumer library will automatically try to recover from all types of errors. To prevent Lua code from being blocked for long periods of time, a “read timeout” error message is returned if no message is received from the Kafka server for more than 1s.

Here are some error messages you might be interested in:

  • “read timeout”: This error gives you the opportunity to exit the message handler function if you don’t receive a message from kafka for a long time.
  • “Subscribed topic not available: topic-ts1-tc23: Broker: Unknown topic or partition”
  • “Fetch from broker 101 reached end of partition at offset 4 (HighwaterMark 4)”

Back to TOC

new_producer

syntax: consumer, err = kafka.new_producer(brokers, opts, cluster_name)

Creates a Kafka producer object. In case of failures, returns nil and a string describing the error.

The brokers argument is a comma-separated list of hosts or hosts: ports (the default port is 9092). lua-resty-kafka-fast will use the bootstrap brokers to acquire the full set of brokers from the cluster.

The opts argument is a Lua table holding the following keys:

  • producer_type

    Specifies the producer.type. “async” or “sync”

  • broker.address.family

    Allowed broker IP address families: any, v4, v6

    The values: any v4, v6.

    Default value: any.

  • security.protocol

    Protocol used to communicate with brokers.

    The values: plaintext, ssl Default value: plaintext

  • enable.ssl.certificate.verification

    Enable OpenSSL’s builtin broker (server) certificate verification. The values can be true or false. The default value is true.

  • ssl.key.location

    Path to client’s private key (PEM) used for authentication.

  • ssl.key.pem

    Client’s private key string (PEM format) used for authentication.

  • ssl.key.password

    Private key passphrase

  • ssl.certificate.location

    Path to client’s public key (PEM) used for authentication.

  • ssl.certificate.pem

    Client’s public key string (PEM format) used for authentication.

  • ssl.ca.location

    File or directory path to CA certificate(s) for verifying the broker’s key.

  • ssl.ca.pem

    CA certificate string (PEM format) for verifying the broker’s key.

  • ssl.cipher.suites

    A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for ciphers(1) and `SSL_CTX_set_cipher_list(3).

  • ssl.crl.location

    Path to CRL for verifying broker’s certificate validity.

  • socket.timeout.ms

    Default timeout for network requests. Consumer: FetchRequests will use fetch.wait.max.ms + socket.timeout.ms.

    The default value is 60000 ms.

  • socket.connection.setup.timeout.ms

    Maximum time allowed for broker connection setup (TCP connection setup as well SSL and SASL handshake). If the connection to the broker is not fully functional after this, the connection will be closed and retried.

    The default value is 30000 ms.

  • request.timeout.ms

    The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on request.required.acks being != 0.

    The default value is 30000 ms.

  • delivery.timeout.ms

    Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time lua-resty-kafka-fast may use to deliver a message (including retries). The delivery error occurs when either the retry count or the message timeout is exceeded. The message timeout is automatically adjusted to transaction.timeout.ms if transactional.id is configured.

    The default value is 300000 ms.

  • message_handler

    Specifies the message handler, handle message when producer sent done. syntax: message_handler = function (topic, key, message, partition, offset, err) end. Whether this message was sent successfully or unsuccessfully, the message_handler function is executed.

  • error_handler

    Specifies the error handler, handle error reported by the underlying driver. syntax: error_handler = function (err) end. You can log the error message for via ngx.log.

The third optional cluster_name specifies the name of the cluster, default 1 (yeah, it’s number). You can Specifies different names when you have two or more kafka clusters. And this only works with async producer_type.

Back to TOC

producer.send

syntax: msg, err = producer:send(topic, key, message)

Produce and send a single message to broker.

The topic argument is a Lua string and is required.

The key argument is a Lua string and is optional.

The message is a Lua string ans is required.

Back to TOC

producer.send_multi

syntax: msg, err = producer:send(messages)

Produce and send multiple messages to broker.

The messages argument is a lua array. Each array member is a lua table, which contains three fields: topic, key, and message.

For example,

local messages = {
 {topic="test-1", key = "k1", message="abc"},
 {topic="test-2", key = "k2", message="def"},
 {topic="test-3", key = "k3", message="hig"},
}

producer:send(messages)

Back to TOC

describe_group

syntax: consumer, err = kafka.describe_group(brokers, opts, group_id, topics)

Fetch the offset information of the topic owned by the group.

The brokers, opts, group_id arguments are just like the arguments in kafka.new_consumer.

The optional topics argument is a Lua table. each element in topics contains two fields: topic and partition.

For example:

local msg, err = kafka.describe_group(
    "127.0.0.1:9094",
    {["broker.address.family"]="v4"},
    "group-name")
local msg, err = kafka.describe_group(
    "127.0.0.1:9094",
    {["broker.address.family"]="v4"},
    "group-name",
    {{topic = "topic1", partition = 0}})

Back to TOC

Copyright & Licenses

Copyright (C) 2024 OpenResty Inc. All rights reserved.

This software is proprietary and must not be redistributed or shared at all.

Back to TOC