View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2014-2018, VU University Amsterdam
    7                              CWI Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(hub,
   37          [ hub_create/3,               % +HubName, -Hub, +Options
   38            hub_add/3,                  % +HubName, +Websocket, ?Id
   39            hub_send/2,                 % +ClientId, +Message
   40            hub_broadcast/2,            % +HubName, +Message
   41            hub_broadcast/3,            % +HubName, +Message, +Condition
   42            current_hub/2               % ?HubName, ?Hub
   43          ]).   44:- use_module(library(debug)).   45:- use_module(library(error)).   46:- use_module(library(apply)).   47:- use_module(library(gensym)).   48:- if(exists_source(library(uuid))).   49:- use_module(library(uuid)).   50:- endif.   51:- use_module(library(ordsets)).   52:- use_module(library(http/websocket)).   53
   54:- meta_predicate
   55    hub_broadcast(+,+,1).   56
   57/** <module> Manage a hub for websockets
   58
   59This library manages a hub that consists   of clients that are connected
   60using a websocket. Messages arriving at any   of the websockets are sent
   61to the _event_ queue  of  the  hub.   In  addition,  the  hub provides a
   62_broadcast_ interface. A typical usage scenario  for   a  hub is a _chat
   63server_ A scenario for realizing an chat server is:
   64
   65  1. Create a new hub using hub_create/3.
   66  2. Create one or more threads that listen to Hub.queues.event from
   67     the created hub.  These threads can update the shared view of the
   68     world. A message is a dict as returned by ws_receive/2 or a
   69     hub control message. Currently, the following control messages
   70     are defined:
   71
   72       - hub{left:ClientId, reason:Reason, error:Error}
   73       A client left us because of an I/O error.  Reason is =read=
   74       or =write= and Error is the Prolog I/O exception.
   75
   76       - hub{joined:ClientId}
   77       A new client has joined the chatroom.
   78
   79     The thread(s) can talk to clients using two predicates:
   80
   81       - hub_send/2 sends a message to a specific client
   82       - hub_broadcast/2 sends a message to all clients of the
   83         hub.
   84
   85A hub consists of (currenty) four message   queues  and a simple dynamic
   86fact. Threads that are needed for the communication tasks are created on
   87demand and die if no more work needs to be done.
   88
   89@tbd    The current design does not use threads to perform tasks for
   90        multiple hubs.  This implies that the design scales rather
   91        poorly for hosting many hubs with few users.
   92*/
   93
   94:- dynamic
   95    hub/2,                          % Hub, Queues ...
   96    websocket/5.                    % Hub, Socket, Queue, Lock, Id
   97
   98:- volatile hub/2, websocket/5.   99
  100%!  hub_create(+Name, -Hub, +Options) is det.
  101%
  102%   Create a new hub. Hub is a  dict containing the following public
  103%   information:
  104%
  105%     - Hub.name
  106%       The name of the hub (the Name argument)
  107%     - queues.event
  108%       Message queue to which the hub thread(s) can listen.
  109%
  110%   After creating a hub, the application  normally creates a thread
  111%   that listens to Hub.queues.event and  exposes some mechanisms to
  112%   establish websockets and add them to the hub using hub_add/3.
  113%
  114%   @see    http_upgrade_to_websocket/3 establishes a websocket from
  115%           the SWI-Prolog webserver.
  116
  117hub_create(HubName, Hub, _Options) :-
  118    must_be(atom, HubName),
  119    message_queue_create(WaitQueue),
  120    message_queue_create(ReadyQueue),
  121    message_queue_create(EventQueue),
  122    message_queue_create(BroadcastQueue),
  123    Hub = hub{name:HubName,
  124              queues:_{wait:WaitQueue,
  125                       ready:ReadyQueue,
  126                       event:EventQueue,
  127                       broadcast:BroadcastQueue
  128                      }},
  129    assertz(hub(HubName, Hub)).
  130
  131
  132%!  current_hub(?Name, ?Hub) is nondet.
  133%
  134%   True when there exists a hub Hub with Name.
  135
  136current_hub(HubName, Hub) :-
  137    hub(HubName, Hub).
  138
  139
  140                 /*******************************
  141                 *            WAITERS           *
  142                 *******************************/
  143
  144/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  145The task of this layer is to wait   for  (a potentially large number of)
  146websockets. Whenever there is data on one   of these sockets, the socket
  147is handed to Hub.queues.ready. This is realised using wait_for_input/3,
  148which allows a single thread  to  wait   for  many  sockets.  But ... on
  149Windows it allows to wait for at most  64 sockets. In addition, there is
  150no way to add an additional input   for control messages because Windows
  151select() can only wait for sockets. On Unix   we could use pipe/2 to add
  152the control channal. On Windows  we   would  need  an additional network
  153service, giving rise its own  problems   with  allocation, firewalls and
  154security.
  155
  156So, instead we keep a queue of websockets   that  need to be waited for.
  157Whenever we add a  websocket,  we  create   a  waiter  thread  that will
  158typically start waiting for this socket.   In  addition, we schedule any
  159waiting thread that has less  than  the   maximum  number  of sockets to
  160timeout at as good as we can the same   time.  All of them will hunt for
  161the same set of queues,  but  they  have   to  wait  for  each other and
  162therefore most of the time one thread will walk away with all websockets
  163and the others commit suicide because there is nothing to wait for.
  164- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  165
  166:- meta_predicate
  167    hub_thread(0, +, +).  168
  169%!  hub_add(+Hub, +WebSocket, ?Id) is det.
  170%
  171%   Add a WebSocket to the hub. Id is used to identify this user. It may
  172%   be provided (as a ground term) or is generated as a UUID.
  173
  174hub_add(HubName, WebSocket, Id) :-
  175    must_be(atom, HubName),
  176    hub(HubName, Hub),
  177    (   var(Id)
  178    ->  uuid(Id)
  179    ;   true
  180    ),
  181    message_queue_create(OutputQueue),
  182    mutex_create(Lock),
  183                                         % asserta/1 allows for reuse of Id
  184    asserta(websocket(HubName, WebSocket, OutputQueue, Lock, Id)),
  185    thread_send_message(Hub.queues.wait, WebSocket),
  186    thread_send_message(Hub.queues.event,
  187                        hub{joined:Id}),
  188    debug(hub(gate), 'Joined ~w: ~w', [HubName, Id]),
  189    create_wait_thread(Hub).
  190
  191:- if(\+current_predicate(uuid/1)).  192% FIXME: Proper pure Prolog random UUID implementation
  193uuid(UUID) :-
  194    A is random(1<<63),
  195    format(atom(UUID), '~d', [A]).
  196:- endif.  197
  198create_wait_thread(Hub) :-
  199    hub_thread(wait_for_sockets(Hub), Hub, hub_wait_).
  200
  201wait_for_sockets(Hub) :-
  202    wait_for_sockets(Hub, 64).
  203
  204wait_for_sockets(Hub, Max) :-
  205    Queues = Hub.queues,
  206    repeat,
  207      get_messages(Queues.wait, Max, List),
  208      (   List \== []
  209      ->  create_new_waiter_if_needed(Hub),
  210          sort(List, Set),
  211          (   debugging(hub(wait))
  212          ->  length(Set, Len),
  213              debug(hub(wait), 'Waiting for ~d queues', [Len])
  214          ;   true
  215          ),
  216          wait_for_set(Set, Left, ReadySet, Max),
  217          (   ReadySet \== []
  218          ->  debug(hub(ready), 'Data on ~p', [ReadySet]),
  219              Ready = Queues.ready,
  220              maplist(thread_send_message(Ready), ReadySet),
  221              create_reader_threads(Hub),
  222              ord_subtract(Set, ReadySet, NotReadySet)
  223          ;   NotReadySet = Left             % timeout
  224          ),
  225          (   NotReadySet \== []
  226          ->  debug(hub(wait), 'Re-scheduling: ~p', [NotReadySet]),
  227              Wait = Queues.wait,
  228              maplist(thread_send_message(Wait), NotReadySet),
  229              fail
  230          ;   true
  231          )
  232      ;   !
  233      ).
  234
  235create_new_waiter_if_needed(Hub) :-
  236    message_queue_property(Hub.queues.wait, size(0)),
  237    !.
  238create_new_waiter_if_needed(Hub) :-
  239    create_wait_thread(Hub).
  240
  241%!  wait_for_set(+Set0, -Left, -Ready, +Max) is det.
  242%
  243%   Wait for input from Set0.  Note that Set0 may contain closed
  244%   websockets.
  245
  246wait_for_set([], [], [], _) :-
  247    !.
  248wait_for_set(Set0, Set, ReadySet, Max) :-
  249    wait_timeout(Set0, Max, Timeout),
  250    catch(wait_for_input(Set0, ReadySet, Timeout),
  251          error(existence_error(stream, S), _), true),
  252    (   var(S)
  253    ->  Set = Set0
  254    ;   delete(Set0, S, Set1),
  255        wait_for_set(Set1, Set, ReadySet, Max)
  256    ).
  257
  258
  259%!  wait_timeout(+WaitForList, +Max, -TimeOut) is det.
  260%
  261%   Determine the timeout, such that   multiple  threads waiting for
  262%   less than the maximum number of  sockets   time  out at the same
  263%   moment and we can combine them on a single thread.
  264
  265:- dynamic
  266    scheduled_timeout/1.  267
  268wait_timeout(List, Max, Timeout) :-
  269    length(List, Max),
  270    !,
  271    Timeout = infinite.
  272wait_timeout(_, _, Timeout) :-
  273    get_time(Now),
  274    (   scheduled_timeout(SchedAt)
  275    ->  (   SchedAt > Now
  276        ->  At = SchedAt
  277        ;   retractall(scheduled_timeout(_)),
  278            At is ceiling(Now) + 1,
  279            asserta(scheduled_timeout(At))
  280        )
  281    ;   At is ceiling(Now) + 1,
  282        asserta(scheduled_timeout(At))
  283    ),
  284    Timeout is At - Now.
  285
  286
  287%!  get_messages(+Queue, +Max, -List) is det.
  288%
  289%   Get the next Max messages from  Queue   or  as many as there are
  290%   available without blocking very long.   This routine is designed
  291%   such that if multiple threads are running for messages, one gets
  292%   all of them and the others nothing.
  293
  294get_messages(Q, N, List) :-
  295    with_mutex(hub_wait,
  296               get_messages_sync(Q, N, List)).
  297
  298get_messages_sync(Q, N, [H|T]) :-
  299    succ(N2, N),
  300    thread_get_message(Q, H, [timeout(0.01)]),
  301    !,
  302    get_messages_sync(Q, N2, T).
  303get_messages_sync(_, _, []).
  304
  305
  306                 /*******************************
  307                 *            READERS           *
  308                 *******************************/
  309
  310/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  311The next layer consists of `readers'.   Whenever  one or more websockets
  312have   data,   the   socket   is    added   to   Hub.queues.ready   and
  313create_reader_threads/1 is called. This  examines   the  number of ready
  314sockets and fires a number  of  threads   to  handle  the read requests.
  315Multiple threads are mainly needed for the case that a client signals to
  316be  ready,  but  only  provides  an   incomplete  message,  causing  the
  317ws_receive/2 to block.
  318
  319Each  of  the  threads  reads  the  next   message  and  sends  this  to
  320Hub.queues.event. The websocket is then rescheduled   to listen for new
  321events. This read either fires a thread   to  listen for the new waiting
  322socket using create_wait_thread/1 or, if there   are no more websockets,
  323does this job itself. This  deals  with   the  common  scenario that one
  324client wakes up, starts a thread to  read   its  event and waits for new
  325messages on the same websockets.
  326- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  327
  328create_reader_threads(Hub) :-
  329    message_queue_property(Hub.queues.ready, size(Ready)),
  330    Threads is ceiling(sqrt(Ready)),
  331    forall(between(1, Threads, _),
  332           create_reader_thread(Hub)).
  333
  334create_reader_thread(Hub) :-
  335    hub_thread(read_message(Hub), Hub, hub_read_ws_).
  336
  337read_message(Hub) :-
  338    Queues = Hub.queues,
  339    thread_get_message(Queues.ready, WS, [timeout(0)]),
  340    !,
  341    catch(ws_receive(WS, Message), Error, true),
  342    (   var(Error),
  343        websocket(HubName, WS, _, _, Id)
  344    ->  (   Message.get(opcode) == close
  345        ->  close_client(WS, Message)
  346        ;   Event = Message.put(_{client:Id, hub:HubName}),
  347            debug(hub(event), 'Event: ~p', [Event]),
  348            thread_send_message(Queues.event, Event),
  349            (   Message.get(opcode) == close
  350            ->  CloseError = error(_,_),
  351                catch(ws_close(WS, 1000, ""), CloseError,
  352                      ws_warning(CloseError))
  353            ;   thread_send_message(Queues.wait, WS)
  354            ),
  355            (   message_queue_property(Queues.ready, size(0))
  356            ->  !,
  357                wait_for_sockets(Hub)
  358            ;   create_wait_thread(Hub),
  359                read_message(Hub)
  360            )
  361        )
  362    ;   websocket(_, WS, _, _, _)
  363    ->  io_read_error(WS, Error),
  364        read_message(Hub)
  365    ;   read_message(Hub)                   % already destroyed
  366    ).
  367read_message(_).
  368
  369ws_warning(error(Formal, _)) :-
  370    silent(Formal),
  371    !.
  372ws_warning(Error) :-
  373    print_message(warning, Error).
  374
  375silent(socket_error(epipe, _)).
  376
  377%!  io_read_error(+WebSocket, +Error)
  378%
  379%   Called on a read error from WebSocket.   We  close the websocket and
  380%   send the hub an event that we   lost the connection to the specified
  381%   client. Note that we leave  destruction   of  the  anonymous message
  382%   queue and mutex to the Prolog garbage collector.
  383
  384io_read_error(WebSocket, Error) :-
  385    debug(hub(gate), 'Got read error on ~w: ~p',
  386          [WebSocket, Error]),
  387    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  388    !,
  389    E = error(_,_),
  390    catch(ws_close(WebSocket, 1011, Error), E,
  391          ws_warning(E)),
  392    hub(HubName, Hub),
  393    thread_send_message(Hub.queues.event,
  394                        hub{left:Id,
  395                            hub:HubName,
  396                            reason:read,
  397                            error:Error}).
  398io_read_error(_, _).                      % already considered gone
  399
  400close_client(WebSocket, Message) :-
  401    Message.get(data) == end_of_file,
  402    !,
  403    io_read_error(WebSocket, end_of_file).
  404close_client(WebSocket, Message) :-
  405    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  406    !,
  407    E = error(_,_),
  408    catch(ws_close(WebSocket, 1000, "Bye"), E,
  409          ws_warning(E)),
  410    hub(HubName, Hub),
  411    thread_send_message(Hub.queues.event,
  412                        hub{left:Id,
  413                            hub:HubName,
  414                            reason:close,
  415                            data:Message.data
  416                           }).
  417
  418%!  io_write_error(+WebSocket, +Message, +Error)
  419%
  420%   Failed to write Message to WebSocket due   to  Error. Note that this
  421%   may be a pending but closed WebSocket.  We first check whether there
  422%   is a new one and if not  send   a  `left` message and pass the error
  423%   such that the client can re-send it when appropriate.
  424
  425io_write_error(WebSocket, Message, Error) :-
  426    debug(hub(gate), 'Got write error on ~w: ~p',
  427          [WebSocket, Error]),
  428    retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)),
  429    !,
  430    catch(ws_close(WebSocket, 1011, Error), _, true),
  431    (   websocket(_, _, _, _, Id)
  432    ->  true
  433    ;   hub(HubName, Hub),
  434        thread_send_message(Hub.queues.event,
  435                            hub{left:Id,
  436                                hub:HubName,
  437                                reason:write(Message),
  438                                error:Error})
  439    ).
  440io_write_error(_, _, _).                      % already considered gone
  441
  442
  443                 /*******************************
  444                 *        SENDING MESSAGES      *
  445                 *******************************/
  446
  447/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  448My  initial  thought  about  sending  messages    was  to  add  a  tuple
  449WebSocket-Message to an output  queue  and   have  a  dynamic  number of
  450threads sending these messages to the   websockets. But, it is desirable
  451that, if multiple messages are sent to  a particular client, they arrive
  452in this order. As multiple threads are performing this task, this is not
  453easy to guarantee. Therefore, we create an  output queue and a mutex for
  454each client. An output thread will   walk  along the websockets, looking
  455for one that has pending messages.  It   then  grabs the lock associated
  456with the client and sends all waiting output messages.
  457
  458The price is that we might peek   a significant number of message queues
  459before we find one that  contains  messages.   If  this  proves  to be a
  460significant problem, we  could  maintain  a   queue  of  queues  holding
  461messages.
  462- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  463
  464%!  hub_send(+ClientId, +Message) is semidet.
  465%
  466%   Send message to the indicated ClientId.   Fails silently if ClientId
  467%   does not exist.
  468%
  469%   @arg    Message is either a single message (as accepted by
  470%           ws_send/2) or a list of such messages.
  471
  472hub_send(ClientId, Message) :-
  473    websocket(HubName, _WS, ClientQueue, _Lock, ClientId),
  474    hub(HubName, Hub),
  475    (   is_list(Message)
  476    ->  maplist(queue_output(ClientQueue), Message)
  477    ;   queue_output(ClientQueue, Message)
  478    ),
  479    create_output_thread(Hub, ClientQueue).
  480
  481create_output_thread(Hub, Queue) :-
  482    hub_thread(broadcast_from_queue(Queue, [timeout(0)]),
  483               Hub, hub_out_q_).
  484
  485%!  hub_broadcast(+Hub, +Message) is det.
  486%!  hub_broadcast(+Hub, +Message, :Condition) is det.
  487%
  488%   Send Message to all websockets  associated   with  Hub for which
  489%   call(Condition,  Id)  succeeds.  Note  that    this  process  is
  490%   _asynchronous_: this predicate returns immediately after putting
  491%   all requests in a  broadcast  queue.   If  a  message  cannot be
  492%   delivered due to a network error,   the  hub is informed through
  493%   io_error/3.
  494
  495hub_broadcast(HubName, Message) :-
  496    hub_broadcast(HubName, Message, all).
  497
  498all(_).
  499
  500hub_broadcast(HubName, Message, Condition) :-
  501    must_be(atom, HubName),
  502    hub(HubName, Hub),
  503    State = count(0),
  504    forall(( websocket(HubName, _WS, ClientQueue, _Lock, Id),
  505             call(Condition, Id)
  506           ),
  507           ( queue_output(ClientQueue, Message),
  508             inc_count(State)
  509           )),
  510    State = count(Count),
  511    create_broadcast_threads(Hub, Count).
  512
  513queue_output(Queue, Message) :-
  514    thread_send_message(Queue, Message).
  515
  516inc_count(State) :-
  517    arg(1, State, C0),
  518    C1 is C0+1,
  519    nb_setarg(1, State, C1).
  520
  521create_broadcast_threads(Hub, Count) :-
  522    Threads is ceiling(sqrt(Count)),
  523    forall(between(1, Threads, _),
  524           create_broadcast_thread(Hub)).
  525
  526create_broadcast_thread(Hub) :-
  527    hub_thread(broadcast_from_queues(Hub, [timeout(0)]),
  528                    Hub, hub_out_all_).
  529
  530
  531%!  broadcast_from_queues(+Hub, +Options) is det.
  532%
  533%   Broadcast from over all known queues.
  534
  535broadcast_from_queues(Hub, Options) :-
  536    forall(websocket(Hub.name, _WebSocket, Queue, _Lock, _Id),
  537           broadcast_from_queue(Queue, Options)).
  538
  539
  540%!  broadcast_from_queue(+Queue, +Options) is det.
  541%
  542%   Send all messages pending for Queue.   Note  that this predicate
  543%   locks the mutex associated  with  the   Queue,  such  that other
  544%   workers cannot start sending messages to this client. Concurrent
  545%   sending  would  lead  to  out-of-order    arrival  of  broadcast
  546%   messages.  If  the  mutex  is  already  held,  someone  else  is
  547%   processing this message queue, so we don't have to worry.
  548
  549broadcast_from_queue(Queue, _Options) :-
  550    message_queue_property(Queue, size(0)),
  551    !.
  552broadcast_from_queue(Queue, Options) :-
  553    websocket(_Hub, _WebSocket, Queue, Lock, _Id),
  554    !,
  555    (   setup_call_cleanup(
  556            mutex_trylock(Lock),
  557            broadcast_from_queue_sync(Queue, Options),
  558            mutex_unlock(Lock))
  559    ->  true
  560    ;   true
  561    ).
  562broadcast_from_queue(_, _).
  563
  564% Note that we re-fetch websocket/5, such that we terminate if something
  565% closed the websocket.
  566
  567broadcast_from_queue_sync(Queue, Options) :-
  568    repeat,
  569      (   websocket(_Hub, WebSocket, Queue, _Lock, _Id),
  570          thread_get_message(Queue, Message, Options)
  571      ->  debug(hub(broadcast),
  572                'To: ~p messages: ~p', [WebSocket, Message]),
  573          catch(ws_send(WebSocket, Message), E,
  574                io_write_error(WebSocket, Message, E)),
  575          fail
  576      ;   !
  577      ).
  578
  579%!  hub_thread(:Goal, +Hub, +Task) is det.
  580%
  581%   Create a (temporary) thread for the hub to perform Task. We
  582%   created named threads if debugging hub(thread) is enabled.
  583
  584hub_thread(Goal, _, Task) :-
  585    debugging(hub(thread)),
  586    !,
  587    gensym(Task, Alias),
  588    thread_create(Goal, _, [detached(true), alias(Alias)]).
  589hub_thread(Goal, _, _) :-
  590    thread_create(Goal, _, [detached(true)])