View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2014-2018, VU University Amsterdam
    7                              CWI Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(hub,
   37          [ hub_create/3,               % +HubName, -Hub, +Options
   38            hub_add/3,                  % +HubName, +Websocket, ?Id
   39            hub_send/2,                 % +ClientId, +Message
   40            hub_broadcast/2,            % +HubName, +Message
   41            hub_broadcast/3,            % +HubName, +Message, +Condition
   42            current_hub/2               % ?HubName, ?Hub
   43          ]).   44:- use_module(library(debug)).   45:- use_module(library(error)).   46:- use_module(library(apply)).   47:- use_module(library(gensym)).   48:- if(exists_source(library(uuid))).   49:- use_module(library(uuid)).   50:- endif.   51:- use_module(library(ordsets)).   52:- use_module(library(http/websocket)).   53
   54:- meta_predicate
   55    hub_broadcast(+,+,1).

Manage a hub for websockets

This library manages a hub that consists of clients that are connected using a websocket. Messages arriving at any of the websockets are sent to the event queue of the hub. In addition, the hub provides a broadcast interface. A typical usage scenario for a hub is a chat server A scenario for realizing an chat server is:

  1. Create a new hub using hub_create/3.
  2. Create one or more threads that listen to Hub.queues.event from the created hub. These threads can update the shared view of the world. A message is a dict as returned by ws_receive/2 or a hub control message. Currently, the following control messages are defined:
    hub{error:Error, left:ClientId, reason:Reason}
    A client left us because of an I/O error. Reason is read or write and Error is the Prolog I/O exception.
    hub{joined:ClientId}
    A new client has joined the chatroom.

    The thread(s) can talk to clients using two predicates:

A hub consists of (currenty) four message queues and a simple dynamic fact. Threads that are needed for the communication tasks are created on demand and die if no more work needs to be done.

To be done
-
The current design does not use threads to perform tasks for multiple hubs. This implies that the design scales rather poorly for hosting many hubs with few users. */
   94:- dynamic
   95    hub/2,                          % Hub, Queues ...
   96    websocket/5.                    % Hub, Socket, Queue, Lock, Id
   97
   98:- volatile hub/2, websocket/5.
 hub_create(+Name, -Hub, +Options) is det
Create a new hub. Hub is a dict containing the following public information:
Hub.name
The name of the hub (the Name argument)
queues.event
Message queue to which the hub thread(s) can listen.

After creating a hub, the application normally creates a thread that listens to Hub.queues.event and exposes some mechanisms to establish websockets and add them to the hub using hub_add/3.

See also
- http_upgrade_to_websocket/3 establishes a websocket from the SWI-Prolog webserver.
  117hub_create(HubName, Hub, _Options) :-
  118    must_be(atom, HubName),
  119    message_queue_create(WaitQueue),
  120    message_queue_create(ReadyQueue),
  121    message_queue_create(EventQueue),
  122    message_queue_create(BroadcastQueue),
  123    Hub = hub{name:HubName,
  124              queues:_{wait:WaitQueue,
  125                       ready:ReadyQueue,
  126                       event:EventQueue,
  127                       broadcast:BroadcastQueue
  128                      }},
  129    assertz(hub(HubName, Hub)).
 current_hub(?Name, ?Hub) is nondet
True when there exists a hub Hub with Name.
  136current_hub(HubName, Hub) :-
  137    hub(HubName, Hub).
  138
  139
  140                 /*******************************
  141                 *            WAITERS           *
  142                 *******************************/
  143
  144/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  145The task of this layer is to wait   for  (a potentially large number of)
  146websockets. Whenever there is data on one   of these sockets, the socket
  147is handed to Hub.queues.ready. This is realised using wait_for_input/3,
  148which allows a single thread  to  wait   for  many  sockets.  But ... on
  149Windows it allows to wait for at most  64 sockets. In addition, there is
  150no way to add an additional input   for control messages because Windows
  151select() can only wait for sockets. On Unix   we could use pipe/2 to add
  152the control channal. On Windows  we   would  need  an additional network
  153service, giving rise its own  problems   with  allocation, firewalls and
  154security.
  155
  156So, instead we keep a queue of websockets   that  need to be waited for.
  157Whenever we add a  websocket,  we  create   a  waiter  thread  that will
  158typically start waiting for this socket.   In  addition, we schedule any
  159waiting thread that has less  than  the   maximum  number  of sockets to
  160timeout at as good as we can the same   time.  All of them will hunt for
  161the same set of queues,  but  they  have   to  wait  for  each other and
  162therefore most of the time one thread will walk away with all websockets
  163and the others commit suicide because there is nothing to wait for.
  164- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  165
  166:- meta_predicate
  167    hub_thread(0, +, +).
 hub_add(+Hub, +WebSocket, ?Id) is det
Add a WebSocket to the hub. Id is used to identify this user. It may be provided (as a ground term) or is generated as a UUID.
  174hub_add(HubName, WebSocket, Id) :-
  175    must_be(atom, HubName),
  176    hub(HubName, Hub),
  177    (   var(Id)
  178    ->  uuid(Id)
  179    ;   true
  180    ),
  181    message_queue_create(OutputQueue),
  182    mutex_create(Lock),
  183                                         % asserta/1 allows for reuse of Id
  184    asserta(websocket(HubName, WebSocket, OutputQueue, Lock, Id)),
  185    thread_send_message(Hub.queues.wait, WebSocket),
  186    thread_send_message(Hub.queues.event,
  187                        hub{joined:Id}),
  188    debug(hub(gate), 'Joined ~w: ~w', [HubName, Id]),
  189    create_wait_thread(Hub).
  190
  191:- if(\+current_predicate(uuid/1)).  192% FIXME: Proper pure Prolog random UUID implementation
  193uuid(UUID) :-
  194    A is random(1<<63),
  195    format(atom(UUID), '~d', [A]).
  196:- endif.  197
  198create_wait_thread(Hub) :-
  199    hub_thread(wait_for_sockets(Hub), Hub, hub_wait_).
  200
  201wait_for_sockets(Hub) :-
  202    wait_for_sockets(Hub, 64).
  203
  204wait_for_sockets(Hub, Max) :-
  205    Queues = Hub.queues,
  206    repeat,
  207      get_messages(Queues.wait, Max, List),
  208      (   List \== []
  209      ->  create_new_waiter_if_needed(Hub),
  210          sort(List, Set),
  211          (   debugging(hub(wait))
  212          ->  length(Set, Len),
  213              debug(hub(wait), 'Waiting for ~d queues', [Len])
  214          ;   true
  215          ),
  216          wait_for_set(Set, Left, ReadySet, Max),
  217          (   ReadySet \== []
  218          ->  debug(hub(ready), 'Data on ~p', [ReadySet]),
  219              Ready = Queues.ready,
  220              maplist(thread_send_message(Ready), ReadySet),
  221              create_reader_threads(Hub),
  222              ord_subtract(Set, ReadySet, NotReadySet)
  223          ;   NotReadySet = Left             % timeout
  224          ),
  225          (   NotReadySet \== []
  226          ->  debug(hub(wait), 'Re-scheduling: ~p', [NotReadySet]),
  227              Wait = Queues.wait,
  228              maplist(thread_send_message(Wait), NotReadySet),
  229              fail
  230          ;   true
  231          )
  232      ;   !
  233      ).
  234
  235create_new_waiter_if_needed(Hub) :-
  236    message_queue_property(Hub.queues.wait, size(0)),
  237    !.
  238create_new_waiter_if_needed(Hub) :-
  239    create_wait_thread(Hub).
 wait_for_set(+Set0, -Left, -Ready, +Max) is det
Wait for input from Set0. Note that Set0 may contain closed websockets.
  246wait_for_set([], [], [], _) :-
  247    !.
  248wait_for_set(Set0, Set, ReadySet, Max) :-
  249    wait_timeout(Set0, Max, Timeout),
  250    catch(wait_for_input(Set0, ReadySet, Timeout),
  251          error(existence_error(stream, S), _), true),
  252    (   var(S)
  253    ->  Set = Set0
  254    ;   delete(Set0, S, Set1),
  255        wait_for_set(Set1, Set, ReadySet, Max)
  256    ).
 wait_timeout(+WaitForList, +Max, -TimeOut) is det
Determine the timeout, such that multiple threads waiting for less than the maximum number of sockets time out at the same moment and we can combine them on a single thread.
  265:- dynamic
  266    scheduled_timeout/1.  267
  268wait_timeout(List, Max, Timeout) :-
  269    length(List, Max),
  270    !,
  271    Timeout = infinite.
  272wait_timeout(_, _, Timeout) :-
  273    get_time(Now),
  274    (   scheduled_timeout(SchedAt)
  275    ->  (   SchedAt > Now
  276        ->  At = SchedAt
  277        ;   retractall(scheduled_timeout(_)),
  278            At is ceiling(Now) + 1,
  279            asserta(scheduled_timeout(At))
  280        )
  281    ;   At is ceiling(Now) + 1,
  282        asserta(scheduled_timeout(At))
  283    ),
  284    Timeout is At - Now.
 get_messages(+Queue, +Max, -List) is det
Get the next Max messages from Queue or as many as there are available without blocking very long. This routine is designed such that if multiple threads are running for messages, one gets all of them and the others nothing.
  294get_messages(Q, N, List) :-
  295    with_mutex(hub_wait,
  296               get_messages_sync(Q, N, List)).
  297
  298get_messages_sync(Q, N, [H|T]) :-
  299    succ(N2, N),
  300    thread_get_message(Q, H, [timeout(0.01)]),
  301    !,
  302    get_messages_sync(Q, N2, T).
  303get_messages_sync(_, _, []).
  304
  305
  306                 /*******************************
  307                 *            READERS           *
  308                 *******************************/
  309
  310/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  311The next layer consists of `readers'.   Whenever  one or more websockets
  312have   data,   the   socket   is    added   to   Hub.queues.ready   and
  313create_reader_threads/1 is called. This  examines   the  number of ready
  314sockets and fires a number  of  threads   to  handle  the read requests.
  315Multiple threads are mainly needed for the case that a client signals to
  316be  ready,  but  only  provides  an   incomplete  message,  causing  the
  317ws_receive/2 to block.
  318
  319Each  of  the  threads  reads  the  next   message  and  sends  this  to
  320Hub.queues.event. The websocket is then rescheduled   to listen for new
  321events. This read either fires a thread   to  listen for the new waiting
  322socket using create_wait_thread/1 or, if there   are no more websockets,
  323does this job itself. This  deals  with   the  common  scenario that one
  324client wakes up, starts a thread to  read   its  event and waits for new
  325messages on the same websockets.
  326- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  327
  328create_reader_threads(Hub) :-
  329    message_queue_property(Hub.queues.ready, size(Ready)),
  330    Threads is ceiling(sqrt(Ready)),
  331    forall(between(1, Threads, _),
  332           create_reader_thread(Hub)).
  333
  334create_reader_thread(Hub) :-
  335    hub_thread(read_message(Hub), Hub, hub_read_ws_).
  336
  337read_message(Hub) :-
  338    Queues = Hub.queues,
  339    thread_get_message(Queues.ready, WS, [timeout(0)]),
  340    !,
  341    catch(ws_receive(WS, Message), Error, true),
  342    (   var(Error),
  343        websocket(HubName, WS, _, _, Id)
  344    ->  (   Message.get(opcode) == close
  345        ->  close_client(WS, Message)
  346        ;   Event = Message.put(_{client:Id, hub:HubName}),
  347            debug(hub(event), 'Event: ~p', [Event]),
  348            thread_send_message(Queues.event, Event),
  349            (   Message.get(opcode) == close
  350            ->  CloseError = error(_,_),
  351                catch(ws_close(WS, 1000, ""), CloseError,
  352                      ws_warning(CloseError))
  353            ;   thread_send_message(Queues.wait, WS)
  354            ),
  355            (   message_queue_property(Queues.ready, size(0))
  356            ->  !,
  357                wait_for_sockets(Hub)
  358            ;   create_wait_thread(Hub),
  359                read_message(Hub)
  360            )
  361        )
  362    ;   websocket(_, WS, _, _, _)
  363    ->  io_read_error(WS, Error),
  364        read_message(Hub)
  365    ;   read_message(Hub)                   % already destroyed
  366    ).
  367read_message(_).
  368
  369ws_warning(error(Formal, _)) :-
  370    silent(Formal),
  371    !.
  372ws_warning(Error) :-
  373    print_message(warning, Error).
  374
  375silent(socket_error(epipe, _)).
 io_read_error(+WebSocket, +Error)
Called on a read error from WebSocket. We close the websocket and send the hub an event that we lost the connection to the specified client. Note that we leave destruction of the anonymous message queue and mutex to the Prolog garbage collector.
  384io_read_error(WebSocket, Error) :-
  385    debug(hub(gate), 'Got read error on ~w: ~p',
  386          [WebSocket, Error]),
  387    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  388    !,
  389    E = error(_,_),
  390    catch(ws_close(WebSocket, 1011, Error), E,
  391          ws_warning(E)),
  392    hub(HubName, Hub),
  393    thread_send_message(Hub.queues.event,
  394                        hub{left:Id,
  395                            hub:HubName,
  396                            reason:read,
  397                            error:Error}).
  398io_read_error(_, _).                      % already considered gone
  399
  400close_client(WebSocket, Message) :-
  401    Message.get(data) == end_of_file,
  402    !,
  403    io_read_error(WebSocket, end_of_file).
  404close_client(WebSocket, Message) :-
  405    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  406    !,
  407    E = error(_,_),
  408    catch(ws_close(WebSocket, 1000, "Bye"), E,
  409          ws_warning(E)),
  410    hub(HubName, Hub),
  411    thread_send_message(Hub.queues.event,
  412                        hub{left:Id,
  413                            hub:HubName,
  414                            reason:close,
  415                            data:Message.data
  416                           }).
 io_write_error(+WebSocket, +Message, +Error)
Failed to write Message to WebSocket due to Error. Note that this may be a pending but closed WebSocket. We first check whether there is a new one and if not send a left message and pass the error such that the client can re-send it when appropriate.
  425io_write_error(WebSocket, Message, Error) :-
  426    debug(hub(gate), 'Got write error on ~w: ~p',
  427          [WebSocket, Error]),
  428    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  429    !,
  430    catch(ws_close(WebSocket, 1011, Error), _, true),
  431    (   websocket(_, _, _, _, Id)
  432    ->  true
  433    ;   hub(HubName, Hub),
  434        thread_send_message(Hub.queues.event,
  435                            hub{left:Id,
  436                                hub:HubName,
  437                                reason:write(Message),
  438                                error:Error})
  439    ).
  440io_write_error(_, _, _).                      % already considered gone
  441
  442
  443                 /*******************************
  444                 *        SENDING MESSAGES      *
  445                 *******************************/
  446
  447/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  448My  initial  thought  about  sending  messages    was  to  add  a  tuple
  449WebSocket-Message to an output  queue  and   have  a  dynamic  number of
  450threads sending these messages to the   websockets. But, it is desirable
  451that, if multiple messages are sent to  a particular client, they arrive
  452in this order. As multiple threads are performing this task, this is not
  453easy to guarantee. Therefore, we create an  output queue and a mutex for
  454each client. An output thread will   walk  along the websockets, looking
  455for one that has pending messages.  It   then  grabs the lock associated
  456with the client and sends all waiting output messages.
  457
  458The price is that we might peek   a significant number of message queues
  459before we find one that  contains  messages.   If  this  proves  to be a
  460significant problem, we  could  maintain  a   queue  of  queues  holding
  461messages.
  462- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
 hub_send(+ClientId, +Message) is semidet
Send message to the indicated ClientId. Fails silently if ClientId does not exist.
Arguments:
Message- is either a single message (as accepted by ws_send/2) or a list of such messages.
  472hub_send(ClientId, Message) :-
  473    websocket(HubName, _WS, ClientQueue, _Lock, ClientId),
  474    hub(HubName, Hub),
  475    (   is_list(Message)
  476    ->  maplist(queue_output(ClientQueue), Message)
  477    ;   queue_output(ClientQueue, Message)
  478    ),
  479    create_output_thread(Hub, ClientQueue).
  480
  481create_output_thread(Hub, Queue) :-
  482    hub_thread(broadcast_from_queue(Queue, [timeout(0)]),
  483               Hub, hub_out_q_).
 hub_broadcast(+Hub, +Message) is det
 hub_broadcast(+Hub, +Message, :Condition) is det
Send Message to all websockets associated with Hub for which call(Condition, Id) succeeds. Note that this process is asynchronous: this predicate returns immediately after putting all requests in a broadcast queue. If a message cannot be delivered due to a network error, the hub is informed through io_error/3.
  495hub_broadcast(HubName, Message) :-
  496    hub_broadcast(HubName, Message, all).
  497
  498all(_).
  499
  500hub_broadcast(HubName, Message, Condition) :-
  501    must_be(atom, HubName),
  502    hub(HubName, Hub),
  503    State = count(0),
  504    forall(( websocket(HubName, _WS, ClientQueue, _Lock, Id),
  505             call(Condition, Id)
  506           ),
  507           ( queue_output(ClientQueue, Message),
  508             inc_count(State)
  509           )),
  510    State = count(Count),
  511    create_broadcast_threads(Hub, Count).
  512
  513queue_output(Queue, Message) :-
  514    thread_send_message(Queue, Message).
  515
  516inc_count(State) :-
  517    arg(1, State, C0),
  518    C1 is C0+1,
  519    nb_setarg(1, State, C1).
  520
  521create_broadcast_threads(Hub, Count) :-
  522    Threads is ceiling(sqrt(Count)),
  523    forall(between(1, Threads, _),
  524           create_broadcast_thread(Hub)).
  525
  526create_broadcast_thread(Hub) :-
  527    hub_thread(broadcast_from_queues(Hub, [timeout(0)]),
  528                    Hub, hub_out_all_).
 broadcast_from_queues(+Hub, +Options) is det
Broadcast from over all known queues.
  535broadcast_from_queues(Hub, Options) :-
  536    forall(websocket(Hub.name, _WebSocket, Queue, _Lock, _Id),
  537           broadcast_from_queue(Queue, Options)).
 broadcast_from_queue(+Queue, +Options) is det
Send all messages pending for Queue. Note that this predicate locks the mutex associated with the Queue, such that other workers cannot start sending messages to this client. Concurrent sending would lead to out-of-order arrival of broadcast messages. If the mutex is already held, someone else is processing this message queue, so we don't have to worry.
  549broadcast_from_queue(Queue, _Options) :-
  550    message_queue_property(Queue, size(0)),
  551    !.
  552broadcast_from_queue(Queue, Options) :-
  553    websocket(_Hub, _WebSocket, Queue, Lock, _Id),
  554    !,
  555    (   setup_call_cleanup(
  556            mutex_trylock(Lock),
  557            broadcast_from_queue_sync(Queue, Options),
  558            mutex_unlock(Lock))
  559    ->  true
  560    ;   true
  561    ).
  562broadcast_from_queue(_, _).
  563
  564% Note that we re-fetch websocket/5, such that we terminate if something
  565% closed the websocket.
  566
  567broadcast_from_queue_sync(Queue, Options) :-
  568    repeat,
  569      (   websocket(_Hub, WebSocket, Queue, _Lock, _Id),
  570          thread_get_message(Queue, Message, Options)
  571      ->  debug(hub(broadcast),
  572                'To: ~p messages: ~p', [WebSocket, Message]),
  573          catch(ws_send(WebSocket, Message), E,
  574                io_write_error(WebSocket, Message, E)),
  575          fail
  576      ;   !
  577      ).
 hub_thread(:Goal, +Hub, +Task) is det
Create a (temporary) thread for the hub to perform Task. We created named threads if debugging hub(thread) is enabled.
  584hub_thread(Goal, _, Task) :-
  585    debugging(hub(thread)),
  586    !,
  587    gensym(Task, Alias),
  588    thread_create(Goal, _, [detached(true), alias(Alias)]).
  589hub_thread(Goal, _, _) :-
  590    thread_create(Goal, _, [detached(true)])