Erlang: Servidores Concurrentes TCP

Para la programación de elementos que requieran concurrencia, Erlang, es una de las mejores elecciones que se puede tomar. El hecho de construir servidores UDP con los mecanismos que nos ofrece OTP es algo trivial, muy simple de conseguir, tal y como se puede ver en este otro artículo, pero la cosa se complica, cuando lo que queremos obtener es una conexión, o un conjunto de conexiones concurrentes para TCP.

Para comenzar, vamos a poner de nuevo de manifiesto la plantilla del gen_server, si se quieren tener más referencias sobre el gen_server se puede consultar este otro artículo, o de forma más extensa en este artículo del Blog Aprendiendo Erlang.

Por lo tanto, la estructura de un código con gen_server es la siguiente:

-module(server).
-author('bombadil@bosqueviejo.net').
 
-behaviour(gen_server).
 
-define(SERVER, ?MODULE).
 
-export([start_link/]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).
 
-record(state, {}).
 
start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
init([]) ->
    {ok, #state{}}.
 
handle_call(_Request, _From, State) ->
    Reply = ok,
    {reply, Reply, State}.
 
handle_cast(_Msg, State) ->
    {noreply, State}.
 
handle_info(_Info, State) ->
    {noreply, State}.
 
terminate(_Reason, _State) ->
    ok.
 
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

Agregando conexiones TCP

TCP, a diferencia de UDP, necesita formalizar que está escuchando en un nodo, y aceptar cada petición entrante para mantener una comunicación bilateral. En este caso, el init se complica. Debemos de establecer la escucha del puerto y esto requiere de que pasemos algunos parámetro y opciones:

-record(state, {lsocket, socket}).
 
init([]) ->
    Opts = [binary, {packet, }, {active, true}],
    case gen_tcp:listen(5000, Opts) of
        {ok, LSocket} ->
            {ok, #state{lsocket = LSocket}, };
        {error, Reason} ->
            {stop, Reason}
    end.

Para esto, tendremos que agregar el parámetro lsocket y socket al registro de estado.

La función init se encarga de establecer la escucha en el puerto 5000, de forma activa. Para más opciones se puede revisar tanto la documentación de gen_tcp, como las opciones específicas disponibles en el paquete inet a través de setops.

Como se puede ver, en caso de que la escucha se establezca de forma apropiada, hay un timeout configurado a cero, por lo que, se llamará al callback de handle_info, con el parámetro timeout. En ese punto, es donde tenemos que agregar la parte del servidor TCP en la que se espera por una conexión entrante para poder atenderla. La función, por tanto, sería:

handle_info(timeout, State=#state{lsocket=LSocket}) ->
    {ok, Socket} = gen_tcp:accept(LSocket),
    {noreply, State#state{socket=Socket}};
 
handle_info(Info, State=#state{socket=Socket}) ->
    io:format("~p~n", [Info]),
    gen_tcp:send(Socket, io_lib:format("~p~n", [{date(),time()}])),
    gen_tcp:close(Socket),
    {noreply, State, }.

El programa es algo simple, ya que acepta la conexión, espera una transmisión y responde enviando la fecha y hora y cierra la conexión. Podemos probarlo de la siguiente forma:

{% raw %}

$ nc localhost 5000
hola
{{2012,4,4},{20,5,49}}
{% endraw %}

Y ahora… la concurrencia

Como puedes observar, mientras tienes abierta la primera conexión, la segunda no se establece porque el sistema está ocupado con la primera petición. Esto conlleva un cuello de botella, ya que la concurrencia es nula.

Para conseguir ampliar la concurrencia, desde el primer servidor, el proceso que tiene en escucha el puerto, debe de lanzar un proceso, un nuevo servidor para atender cada petición de forma independiente.

Para esto necesitamos crear otro nuevo gen_server, pero con unas características diferentes, ya que no se mantendrá a la espera de conexiones entrantes, sino que será para manejar una conexión activa.

El módulo gen_tcp nos ayuda en esta tarea, ya que cada mensaje entrante lo recibimos a través de handle_info en el proceso padre, como habíamos visto antes, y esto debemos de redireccionarlo hacia el proceso hijo que creemos. En principio, copiando la estructura de gen_server de nuevo y llamándola request, agregamos un par de cambios como el siguiente:

start_link(Socket) ->
    {ok, Pid} = gen_server:start_link(?MODULE, [Socket], []),
    gen_tcp:controlling_process(Socket,Pid),
    inet:setopts(Socket, [{active, once}]),
    {ok, Pid}.

Esta forma de lanzar el gen_server, hace que el proceso no tenga un nombre asociado, por lo que se pueden lanzar tantos como queramos o necesitemos. Para el gen_tcp, indicamos que el proceso que atenderá las peticiones de ese socket será el que se acaba de crear, y se le indica que puede enviar una petición.

La función de inicialización tendrá esta forma:

-record(state, {socket}).
 
init([Socket]) ->
    {ok, #state{socket=Socket}}.

En el módulo server podemos eliminar la parte de gestión de la petición (la parte de handle_info que se refiere estrictamente a la gestión de la petición) y dejar solo la parte de la aceptación de la petición de esta forma:

handle_info(timeout, State=#state{lsocket=LSocket}) ->
    {ok, Socket} = gen_tcp:accept(LSocket),
    request:start_link(Socket),
    {noreply, State, }.

En el nuevo módulo request, agregamos la parte de código que se encarga de realizar la contestación:

handle_info(Info, State=#state{socket=Socket}) ->
    io:format("~p~n", [Info]),
    gen_tcp:send(Socket, io_lib:format("~p~n", [{date(),time()}])),
    gen_tcp:close(Socket),
    {stop, normal, State}.

En este caso, cuando gen_tcp envía un mensaje desde el cliente al servidor, este mensaje disparará la ejecución de esta función, imprimirá por pantalla el mensaje recibido, contestará de forma fija con la fecha y hora actual, cerrará la conexión con el cliente y detendrá la ejecución del proceso.

Algunas pruebas

Si ejecutamos en una máquina virtual de Erlang el servidor, y abrimos dos consolas o tres o más con el siguiente comando:

$ nc localhost 5000

Veremos que todas y cada una de las consolas obtienen conexión y quedan en el proceso hijo a la espera de un mensaje que envíes. Puedes poner el texto que quieras y presionar el retorno de carro. Volviendo a la consola de Erlang, se verá el mensaje escrito en cada una de las consolas.

Si modificamos el código para que acepte paquetes de tipo http y procesamos una respuesta válida, podemos probar la carga a través de programas como wrk o ab. En mis pruebas, he conseguido:

$ ab -n 10000 -c 50 http://127.0.0.1:5000/
[...]
Concurrency Level:      50
Time taken for tests:   0.885 seconds
Complete requests:      10000
Failed requests:        0
Write errors:           0
Total transferred:      760000 bytes
HTML transferred:       110000 bytes
Requests per second:    11303.38 [#/sec] (mean)
Time per request:       4.423 [ms] (mean)
Time per request:       0.088 [ms] (mean, across all concurrent requests)
Transfer rate:          838.92 [Kbytes/sec] received

Una tasa de unas 11 mil peticiones por segundo. Nada mal.