24 Practical Hands on Implementing a Memcached Server for 4 Layer Traffic

24 Practical Hands-On- Implementing a Memcached Server for 4-Layer Traffic #

Hello, I am Wen Ming.

In the previous lessons, we have introduced several Lua APIs for handling requests, but they are all related to layer 7. In addition to that, OpenResty actually provides the stream-lua-nginx-module module to handle layer 4 traffic. The directives and APIs it provides are basically the same as those in lua-nginx-module.

Today, I will show you how to use OpenResty to implement a memcached server, which can be done in just over 100 lines of code. In this small practical, we will use a lot of the content we have learned before and introduce some content from the testing and performance optimization chapters.

So, I hope you can understand one thing clearly: the focus of this lesson is not on understanding the specific function of every line of code, but on having a comprehensive understanding of how to develop a project from scratch with OpenResty, from the perspectives of requirements, testing, and development.

Original Requirements and Technical Solution #

Before starting development, we need to understand what the requirements are and what problems they are meant to solve. Otherwise, we may get lost in the choice of technologies. For example, when you see our topic today, you should first ask yourself, why do we need to implement a memcached server? Why not just install the original memcached or redis?

We know that HTTPS traffic is becoming mainstream, but some older browsers do not support session tickets. In this case, we need to store the session ID on the server side. If local storage space is insufficient, we need a cluster for storage. Since this data can be discarded, using memcached is more appropriate.

At this point, the simplest and most direct solution would be to directly introduce memcached. However, I chose to use OpenResty to build a solution for the following reasons:

  • First, introducing memcached directly would add an additional process, increasing deployment and maintenance costs.
  • Second, this requirement is simple enough, only requiring get and set operations with support for expiration.
  • Third, OpenResty has a stream module that allows us to quickly implement this requirement.

Since we want to implement a memcached server, we need to first understand its protocol. The memcached protocol can support TCP and UDP, but here I choose TCP. The specific protocol for the get and set commands is as follows:

Get
Get the value based on the key
Telnet command: get <key>*\r\n

Example:
get key
VALUE key 0 4 data END



Set
Store a key-value pair in memcached
Telnet command: set <key> <flags> <exptime> <bytes> [noreply]\r\n<value>\r\n

Example:
set key 0 900 4 data
STORED

In addition to get and set, we also need to know how memcached handles “error handling” in its protocol. “Error handling” is very important for server-side programs. When writing programs, we need to handle not only normal requests, but also various exceptions. For example, in the following scenarios:

  • Memcached sends a request other than get or set, how should I handle it?
  • If the server encounters an error, what kind of feedback should I give to the memcached client?

At the same time, we hope to write a client program that is compatible with memcached. This way, users don’t have to distinguish whether it’s the official memcached version or the version implemented by OpenResty.

The following diagram from the memcached documentation describes what should be returned and the specific format when an error occurs, and you can use it as a reference:

Now, let’s determine the technical solution. We know that OpenResty’s shared dict can be accessed across multiple workers, and putting data in shared dict is very similar to putting it in memcached - they both support get and set operations, and the data is lost after process restart. Therefore, using shared dict to simulate memcached is very appropriate, as their principles and behaviors are consistent.

Test-driven development #

Next, we are going to start working on it. However, following the test-driven development approach, before writing the actual code, let’s first construct a simplest test case. We won’t use the test::nginx framework here, since its learning curve is also not low. Instead, let’s try manually testing it using the familiar resty:

$ resty -e 'local memcached = require "resty.memcached"
    local memc, err = memcached:new()

    memc:set_timeout(1000) -- 1 sec
    local ok, err = memc:connect("127.0.0.1", 11212)
    local ok, err = memc:set("dog", 32)
    if not ok then
        ngx.say("failed to set dog: ", err)
        return
    end

    local res, flags, err = memc:get("dog")
    ngx.say("dog: ", res)'

This testing code uses the lua-rety-memcached client library to initiate connect and set operations, assuming that the memcached server is listening on port 11212 of the local machine.

It looks like there should be no problem. You can execute this code on your own machine. If everything goes well, it should return an error message like failed to set dog: closed, because the service is not running at this moment.

So far, your technical solution has been made clear, which is to use the stream module to receive and send data, and at the same time, use shared dict to store the data.

The indicator to measure whether the requirement is completed is also clear, which is to run the above code successfully and print out the actual value of “dog”.

Building the Framework #

So, what are you waiting for? Let’s start writing some code!

Personally, I prefer to first build a minimal runnable code framework, and then gradually fill in the code. The benefit of this approach is that you can set many small goals for yourself during the coding process. Moreover, completing a small goal will provide positive feedback through the test cases.

Let’s start by setting up the Nginx configuration file because the stream and shared dict need to be preconfigured there. Below is the configuration file I have set up:

stream {
    lua_shared_dict memcached 100m;
    lua_package_path 'lib/?.lua;;';
    server {
        listen 11212;
        content_by_lua_block {
            local m = require("resty.memcached.server")
            m.run()
        }
    }
}

In this configuration file, you can see several key pieces of information:

  • First, the code runs in the stream context of Nginx, not the HTTP context, and it listens on port 11212.
  • Second, the name of the shared dict is memcached and its size is 100MB. These cannot be modified during runtime.
  • In addition, the code is located in the lib/resty/memcached directory, the filename is server.lua, and the entry function is run(). You can find this information in lua_package_path and content_by_lua_block.

Next, let’s build the code framework. You can try it yourself first, and then we can take a look at my framework code:

local new_tab = require "table.new"
local str_sub = string.sub
local re_find = ngx.re.find
local mc_shdict = ngx.shared.memcached

local _M = { _VERSION = '0.01' }

local function parse_args(s, start)
end

function _M.get(tcpsock, keys)
end

function _M.set(tcpsock, res)
end

function _M.run()
    local tcpsock = assert(ngx.req.socket(true))

    while true do
        tcpsock:settimeout(60000) -- 60 seconds
        local data, err = tcpsock:receive("*l")

        local command, args
        if data then
            local from, to, err = re_find(data, [[(\S+)]], "jo")
            if from then
                command = str_sub(data, from, to)
                args = parse_args(data, to + 1)
            end
        end

        if args then
            local args_len = #args
            if command == 'get' and args_len > 0 then
                _M.get(tcpsock, args)
            elseif command == "set" and args_len == 4 then
                _M.set(tcpsock, args)
            end
        end
    end
end

return _M

This code implements the main logic of the entry function run(). Although I haven’t done error handling yet and the parse_args, get, and set functions it depends on are empty functions, this framework fully expresses the logic of the memcached server.

Fill in the code #

Next, let’s implement these empty functions one by one according to the execution order of the code.

First, we can parse the arguments of the memcached command based on the protocol of memcached:

local function parse_args(s, start)
    local arr = {}

    while true do
        local from, to = re_find(s, [[\S+]], "jo", {pos = start})
        if not from then
            break
        end

        table.insert(arr, str_sub(s, from, to))

        start = to + 1
    end

    return arr
end

Here, my suggestion is to first implement a version in the most straightforward way without considering any performance optimizations. After all, completion is always more important than perfection, and gradual optimization based on completion can approach perfection.

Next, let’s implement the get function. It can query multiple keys at once, so I used a for loop in the following code:

function _M.get(tcpsock, keys)
    local reply = ""

    for i = 1, #keys do
        local key = keys[i]
        local value, flags = mc_shdict:get(key)
        if value then
            local flags  = flags or 0
            reply = reply .. "VALUE" .. key .. " " .. flags .. " " .. #value .. "\r\n" .. value .. "\r\n"
        end
    end
    reply = reply ..  "END\r\n"

    tcpsock:settimeout(1000)  -- one second timeout
    local bytes, err = tcpsock:send(reply)
end

In fact, the most core line of code here is only one: local value, flags = mc_shdict:get(key), which retrieves data from the shared dict; as for the rest of the code, it concatenates strings according to the memcached protocol and finally sends them to the client.

Finally, let’s look at the set function. It converts the received parameters into the format required by the shared dict API, stores the data, and handles it according to the memcached protocol in case of errors:

function _M.set(tcpsock, res)
    local reply =  ""

    local key = res[1]
    local flags = res[2]
    local exptime = res[3]
    local bytes = res[4]

    local value, err = tcpsock:receive(tonumber(bytes) + 2)

    if str_sub(value, -2, -1) == "\r\n" then
        local succ, err, forcible = mc_shdict:set(key, str_sub(value, 1, bytes), exptime, flags)
        if succ then
            reply = reply .. "STORED\r\n"
        else
            reply = reply .. "SERVER_ERROR " .. err .. "\r\n"
        end
    else
        reply = reply .. "ERROR\r\n"
    end

    tcpsock:settimeout(1000)  -- one second timeout
    local bytes, err = tcpsock:send(reply)
end

In addition, during the process of filling in the above functions, you can use test cases for verification and use ngx.log for debugging. Unfortunately, there is no breakpoint debugging tool in OpenResty, so we use ngx.say and ngx.log for debugging. In this regard, it can be said that we are still in the primitive era.

Closing Remarks #

This practical project is almost coming to an end. Finally, I would like to leave you with a hands-on assignment. Can you run the complete implementation code of the memcached server provided above and pass the test cases?

Today’s assignment might require quite a bit of effort from you. However, keep in mind that this is just a basic version without error handling, performance optimization, and automated testing. We will continue to improve on these aspects in the future. I also hope that through your learning in the upcoming lessons, you will eventually be able to complete a more advanced version.

If you have any doubts or questions regarding today’s lesson or your own practice, please feel free to leave a comment to discuss with me. You are also welcome to share this article with your colleagues and friends. Let’s practice together and make progress together.