1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2014-2018, VU University Amsterdam 7 CWI Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(hub, 37 [ hub_create/3, % +HubName, -Hub, +Options 38 hub_add/3, % +HubName, +Websocket, ?Id 39 hub_send/2, % +ClientId, +Message 40 hub_broadcast/2, % +HubName, +Message 41 hub_broadcast/3, % +HubName, +Message, +Condition 42 current_hub/2 % ?HubName, ?Hub 43 ]). 44:- use_module(library(debug)). 45:- use_module(library(error)). 46:- use_module(library(apply)). 47:- use_module(library(gensym)). 48:- if(exists_source(library(uuid))). 49:- use_module(library(uuid)). 50:- endif. 51:- use_module(library(ordsets)). 52:- use_module(library(http/websocket)). 53 54:- meta_predicate 55 hub_broadcast( , , ).
94:- dynamic 95 hub/2, % Hub, Queues ... 96 websocket/5. % Hub, Socket, Queue, Lock, Id 97 98:- volatile hub/2, websocket/5.
thread(s)
can listen.After creating a hub, the application normally creates a thread that listens to Hub.queues.event and exposes some mechanisms to establish websockets and add them to the hub using hub_add/3.
117hub_create(HubName, Hub, _Options) :-
118 must_be(atom, HubName),
119 message_queue_create(WaitQueue),
120 message_queue_create(ReadyQueue),
121 message_queue_create(EventQueue),
122 message_queue_create(BroadcastQueue),
123 Hub = hub{name:HubName,
124 queues:_{wait:WaitQueue,
125 ready:ReadyQueue,
126 event:EventQueue,
127 broadcast:BroadcastQueue
128 }},
129 assertz(hub(HubName, Hub)).
136current_hub(HubName, Hub) :- 137 hub(HubName, Hub). 138 139 140 /******************************* 141 * WAITERS * 142 *******************************/ 143 144/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 145The task of this layer is to wait for (a potentially large number of) 146websockets. Whenever there is data on one of these sockets, the socket 147is handed to Hub.queues.ready. This is realised using wait_for_input/3, 148which allows a single thread to wait for many sockets. But ... on 149Windows it allows to wait for at most 64 sockets. In addition, there is 150no way to add an additional input for control messages because Windows 151select() can only wait for sockets. On Unix we could use pipe/2 to add 152the control channal. On Windows we would need an additional network 153service, giving rise its own problems with allocation, firewalls and 154security. 155 156So, instead we keep a queue of websockets that need to be waited for. 157Whenever we add a websocket, we create a waiter thread that will 158typically start waiting for this socket. In addition, we schedule any 159waiting thread that has less than the maximum number of sockets to 160timeout at as good as we can the same time. All of them will hunt for 161the same set of queues, but they have to wait for each other and 162therefore most of the time one thread will walk away with all websockets 163and the others commit suicide because there is nothing to wait for. 164- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 165 166:- meta_predicate 167 hub_thread( , , ).
174hub_add(HubName, WebSocket, Id) :- 175 must_be(atom, HubName), 176 hub(HubName, Hub), 177 ( var(Id) 178 -> uuid(Id) 179 ; true 180 ), 181 message_queue_create(OutputQueue), 182 mutex_create(Lock), 183 % asserta/1 allows for reuse of Id 184 asserta(websocket(HubName, WebSocket, OutputQueue, Lock, Id)), 185 thread_send_message(Hub.queues.wait, WebSocket), 186 thread_send_message(Hub.queues.event, 187 hub{joined:Id}), 188 debug(hub(gate), 'Joined ~w: ~w', [HubName, Id]), 189 create_wait_thread(Hub). 190 191:- if(\+current_predicate(uuid/1)). 192% FIXME: Proper pure Prolog random UUID implementation 193uuid(UUID) :- 194 A is random(1<<63), 195 format(atom(UUID), '~d', [A]). 196:- endif. 197 198create_wait_thread(Hub) :- 199 hub_thread(wait_for_sockets(Hub), Hub, hub_wait_). 200 201wait_for_sockets(Hub) :- 202 wait_for_sockets(Hub, 64). 203 204wait_for_sockets(Hub, Max) :- 205 Queues = Hub.queues, 206 repeat, 207 get_messages(Queues.wait, Max, List), 208 ( List \== [] 209 -> create_new_waiter_if_needed(Hub), 210 sort(List, Set), 211 ( debugging(hub(wait)) 212 -> length(Set, Len), 213 debug(hub(wait), 'Waiting for ~d queues', [Len]) 214 ; true 215 ), 216 wait_for_set(Set, Left, ReadySet, Max), 217 ( ReadySet \== [] 218 -> debug(hub(ready), 'Data on ~p', [ReadySet]), 219 Ready = Queues.ready, 220 maplist(thread_send_message(Ready), ReadySet), 221 create_reader_threads(Hub), 222 ord_subtract(Set, ReadySet, NotReadySet) 223 ; NotReadySet = Left % timeout 224 ), 225 ( NotReadySet \== [] 226 -> debug(hub(wait), 'Re-scheduling: ~p', [NotReadySet]), 227 Wait = Queues.wait, 228 maplist(thread_send_message(Wait), NotReadySet), 229 fail 230 ; true 231 ) 232 ; ! 233 ). 234 235create_new_waiter_if_needed(Hub) :- 236 message_queue_property(Hub.queues.wait, size(0)), 237 !. 238create_new_waiter_if_needed(Hub) :- 239 create_wait_thread(Hub).
246wait_for_set([], [], [], _) :- 247 !. 248wait_for_set(Set0, Set, ReadySet, Max) :- 249 wait_timeout(Set0, Max, Timeout), 250 catch(wait_for_input(Set0, ReadySet, Timeout), 251 error(existence_error(stream, S), _), true), 252 ( var(S) 253 -> Set = Set0 254 ; delete(Set0, S, Set1), 255 wait_for_set(Set1, Set, ReadySet, Max) 256 ).
265:- dynamic 266 scheduled_timeout/1. 267 268wait_timeout(List, Max, Timeout) :- 269 length(List, Max), 270 !, 271 Timeout = infinite. 272wait_timeout(_, _, Timeout) :- 273 get_time(Now), 274 ( scheduled_timeout(SchedAt) 275 -> ( SchedAt > Now 276 -> At = SchedAt 277 ; retractall(scheduled_timeout(_)), 278 At is ceiling(Now) + 1, 279 asserta(scheduled_timeout(At)) 280 ) 281 ; At is ceiling(Now) + 1, 282 asserta(scheduled_timeout(At)) 283 ), 284 Timeout is At - Now.
294get_messages(Q, N, List) :- 295 with_mutex(hub_wait, 296 get_messages_sync(Q, N, List)). 297 298get_messages_sync(Q, N, [H|T]) :- 299 succ(N2, N), 300 thread_get_message(Q, H, [timeout(0.01)]), 301 !, 302 get_messages_sync(Q, N2, T). 303get_messages_sync(_, _, []). 304 305 306 /******************************* 307 * READERS * 308 *******************************/ 309 310/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 311The next layer consists of `readers'. Whenever one or more websockets 312have data, the socket is added to Hub.queues.ready and 313create_reader_threads/1 is called. This examines the number of ready 314sockets and fires a number of threads to handle the read requests. 315Multiple threads are mainly needed for the case that a client signals to 316be ready, but only provides an incomplete message, causing the 317ws_receive/2 to block. 318 319Each of the threads reads the next message and sends this to 320Hub.queues.event. The websocket is then rescheduled to listen for new 321events. This read either fires a thread to listen for the new waiting 322socket using create_wait_thread/1 or, if there are no more websockets, 323does this job itself. This deals with the common scenario that one 324client wakes up, starts a thread to read its event and waits for new 325messages on the same websockets. 326- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 327 328create_reader_threads(Hub) :- 329 message_queue_property(Hub.queues.ready, size(Ready)), 330 Threads is ceiling(sqrt(Ready)), 331 forall(between(1, Threads, _), 332 create_reader_thread(Hub)). 333 334create_reader_thread(Hub) :- 335 hub_thread(read_message(Hub), Hub, hub_read_ws_). 336 337read_message(Hub) :- 338 Queues = Hub.queues, 339 thread_get_message(Queues.ready, WS, [timeout(0)]), 340 !, 341 catch(ws_receive(WS, Message), Error, true), 342 ( var(Error), 343 websocket(HubName, WS, _, _, Id) 344 -> ( Message.get(opcode) == close 345 -> close_client(WS, Message) 346 ; Event = Message.put(_{client:Id, hub:HubName}), 347 debug(hub(event), 'Event: ~p', [Event]), 348 thread_send_message(Queues.event, Event), 349 ( Message.get(opcode) == close 350 -> CloseError = error(_,_), 351 catch(ws_close(WS, 1000, ""), CloseError, 352 ws_warning(CloseError)) 353 ; thread_send_message(Queues.wait, WS) 354 ), 355 ( message_queue_property(Queues.ready, size(0)) 356 -> !, 357 wait_for_sockets(Hub) 358 ; create_wait_thread(Hub), 359 read_message(Hub) 360 ) 361 ) 362 ; websocket(_, WS, _, _, _) 363 -> io_read_error(WS, Error), 364 read_message(Hub) 365 ; read_message(Hub) % already destroyed 366 ). 367read_message(_). 368 369ws_warning(error(Formal, _)) :- 370 silent(Formal), 371 !. 372ws_warning(Error) :- 373 print_message(warning, Error). 374 375silent(socket_error(epipe, _)).
384io_read_error(WebSocket, Error) :- 385 debug(hub(gate), 'Got read error on ~w: ~p', 386 [WebSocket, Error]), 387 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 388 !, 389 E = error(_,_), 390 catch(ws_close(WebSocket, 1011, Error), E, 391 ws_warning(E)), 392 hub(HubName, Hub), 393 thread_send_message(Hub.queues.event, 394 hub{left:Id, 395 hub:HubName, 396 reason:read, 397 error:Error}). 398io_read_error(_, _). % already considered gone 399 400close_client(WebSocket, Message) :- 401 Message.get(data) == end_of_file, 402 !, 403 io_read_error(WebSocket, end_of_file). 404close_client(WebSocket, Message) :- 405 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 406 !, 407 E = error(_,_), 408 catch(ws_close(WebSocket, 1000, "Bye"), E, 409 ws_warning(E)), 410 hub(HubName, Hub), 411 thread_send_message(Hub.queues.event, 412 hub{left:Id, 413 hub:HubName, 414 reason:close, 415 data:Message.data 416 }).
left
message and pass the error
such that the client can re-send it when appropriate.425io_write_error(WebSocket, Message, Error) :- 426 debug(hub(gate), 'Got write error on ~w: ~p', 427 [WebSocket, Error]), 428 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 429 !, 430 catch(ws_close(WebSocket, 1011, Error), _, true), 431 ( websocket(_, _, _, _, Id) 432 -> true 433 ; hub(HubName, Hub), 434 thread_send_message(Hub.queues.event, 435 hub{left:Id, 436 hub:HubName, 437 reason:write(Message), 438 error:Error}) 439 ). 440io_write_error(_, _, _). % already considered gone 441 442 443 /******************************* 444 * SENDING MESSAGES * 445 *******************************/ 446 447/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 448My initial thought about sending messages was to add a tuple 449WebSocket-Message to an output queue and have a dynamic number of 450threads sending these messages to the websockets. But, it is desirable 451that, if multiple messages are sent to a particular client, they arrive 452in this order. As multiple threads are performing this task, this is not 453easy to guarantee. Therefore, we create an output queue and a mutex for 454each client. An output thread will walk along the websockets, looking 455for one that has pending messages. It then grabs the lock associated 456with the client and sends all waiting output messages. 457 458The price is that we might peek a significant number of message queues 459before we find one that contains messages. If this proves to be a 460significant problem, we could maintain a queue of queues holding 461messages. 462- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
472hub_send(ClientId, Message) :- 473 websocket(HubName, _WS, ClientQueue, _Lock, ClientId), 474 hub(HubName, Hub), 475 ( is_list(Message) 476 -> maplist(queue_output(ClientQueue), Message) 477 ; queue_output(ClientQueue, Message) 478 ), 479 create_output_thread(Hub, ClientQueue). 480 481create_output_thread(Hub, Queue) :- 482 hub_thread(broadcast_from_queue(Queue, [timeout(0)]), 483 Hub, hub_out_q_).
call(Condition, Id)
succeeds. Note that this process is
asynchronous: this predicate returns immediately after putting
all requests in a broadcast queue. If a message cannot be
delivered due to a network error, the hub is informed through
io_error/3.495hub_broadcast(HubName, Message) :- 496 hub_broadcast(HubName, Message, all). 497 498all(_). 499 500hub_broadcast(HubName, Message, Condition) :- 501 must_be(atom, HubName), 502 hub(HubName, Hub), 503 State = count(0), 504 forall(( websocket(HubName, _WS, ClientQueue, _Lock, Id), 505 call(Condition, Id) 506 ), 507 ( queue_output(ClientQueue, Message), 508 inc_count(State) 509 )), 510 State = count(Count), 511 create_broadcast_threads(Hub, Count). 512 513queue_output(Queue, Message) :- 514 thread_send_message(Queue, Message). 515 516inc_count(State) :- 517 arg(1, State, C0), 518 C1 is C0+1, 519 nb_setarg(1, State, C1). 520 521create_broadcast_threads(Hub, Count) :- 522 Threads is ceiling(sqrt(Count)), 523 forall(between(1, Threads, _), 524 create_broadcast_thread(Hub)). 525 526create_broadcast_thread(Hub) :- 527 hub_thread(broadcast_from_queues(Hub, [timeout(0)]), 528 Hub, hub_out_all_).
535broadcast_from_queues(Hub, Options) :-
536 forall(websocket(Hub.name, _WebSocket, Queue, _Lock, _Id),
537 broadcast_from_queue(Queue, Options)).
549broadcast_from_queue(Queue, _Options) :- 550 message_queue_property(Queue, size(0)), 551 !. 552broadcast_from_queue(Queue, Options) :- 553 websocket(_Hub, _WebSocket, Queue, Lock, _Id), 554 !, 555 ( setup_call_cleanup( 556 mutex_trylock(Lock), 557 broadcast_from_queue_sync(Queue, Options), 558 mutex_unlock(Lock)) 559 -> true 560 ; true 561 ). 562broadcast_from_queue(_, _). 563 564% Note that we re-fetch websocket/5, such that we terminate if something 565% closed the websocket. 566 567broadcast_from_queue_sync(Queue, Options) :- 568 repeat, 569 ( websocket(_Hub, WebSocket, Queue, _Lock, _Id), 570 thread_get_message(Queue, Message, Options) 571 -> debug(hub(broadcast), 572 'To: ~p messages: ~p', [WebSocket, Message]), 573 catch(ws_send(WebSocket, Message), E, 574 io_write_error(WebSocket, Message, E)), 575 fail 576 ; ! 577 ).
hub(thread)
is enabled.584hub_thread(Goal, _, Task) :- 585 debugging(hub(thread)), 586 !, 587 gensym(Task, Alias), 588 thread_create(Goal, _, [detached(true), alias(Alias)]). 589hub_thread(Goal, _, _) :- 590 thread_create(Goal, _, [detached(true)])
Manage a hub for websockets
This library manages a hub that consists of clients that are connected using a websocket. Messages arriving at any of the websockets are sent to the event queue of the hub. In addition, the hub provides a broadcast interface. A typical usage scenario for a hub is a chat server A scenario for realizing an chat server is:
read
orwrite
and Error is the Prolog I/O exception.The
thread(s)
can talk to clients using two predicates:A hub consists of (currenty) four message queues and a simple dynamic fact. Threads that are needed for the communication tasks are created on demand and die if no more work needs to be done.