1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2014-2018, VU University Amsterdam 7 CWI Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(hub, 37 [ hub_create/3, % +HubName, -Hub, +Options 38 hub_add/3, % +HubName, +Websocket, ?Id 39 hub_send/2, % +ClientId, +Message 40 hub_broadcast/2, % +HubName, +Message 41 hub_broadcast/3, % +HubName, +Message, +Condition 42 current_hub/2 % ?HubName, ?Hub 43 ]). 44:- use_module(library(debug)). 45:- use_module(library(error)). 46:- use_module(library(apply)). 47:- use_module(library(gensym)). 48:- if(exists_source(library(uuid))). 49:- use_module(library(uuid)). 50:- endif. 51:- use_module(library(ordsets)). 52:- use_module(library(http/websocket)). 53 54:- meta_predicate 55 hub_broadcast( , , ). 56 57/** <module> Manage a hub for websockets 58 59This library manages a hub that consists of clients that are connected 60using a websocket. Messages arriving at any of the websockets are sent 61to the _event_ queue of the hub. In addition, the hub provides a 62_broadcast_ interface. A typical usage scenario for a hub is a _chat 63server_ A scenario for realizing an chat server is: 64 65 1. Create a new hub using hub_create/3. 66 2. Create one or more threads that listen to Hub.queues.event from 67 the created hub. These threads can update the shared view of the 68 world. A message is a dict as returned by ws_receive/2 or a 69 hub control message. Currently, the following control messages 70 are defined: 71 72 - hub{left:ClientId, reason:Reason, error:Error} 73 A client left us because of an I/O error. Reason is =read= 74 or =write= and Error is the Prolog I/O exception. 75 76 - hub{joined:ClientId} 77 A new client has joined the chatroom. 78 79 The thread(s) can talk to clients using two predicates: 80 81 - hub_send/2 sends a message to a specific client 82 - hub_broadcast/2 sends a message to all clients of the 83 hub. 84 85A hub consists of (currenty) four message queues and a simple dynamic 86fact. Threads that are needed for the communication tasks are created on 87demand and die if no more work needs to be done. 88 89@tbd The current design does not use threads to perform tasks for 90 multiple hubs. This implies that the design scales rather 91 poorly for hosting many hubs with few users. 92*/ 93 94:- dynamic 95 hub/2, % Hub, Queues ... 96 websocket/5. % Hub, Socket, Queue, Lock, Id 97 98:- volatile hub/2, websocket/5. 99 100%! hub_create(+Name, -Hub, +Options) is det. 101% 102% Create a new hub. Hub is a dict containing the following public 103% information: 104% 105% - Hub.name 106% The name of the hub (the Name argument) 107% - queues.event 108% Message queue to which the hub thread(s) can listen. 109% 110% After creating a hub, the application normally creates a thread 111% that listens to Hub.queues.event and exposes some mechanisms to 112% establish websockets and add them to the hub using hub_add/3. 113% 114% @see http_upgrade_to_websocket/3 establishes a websocket from 115% the SWI-Prolog webserver. 116 117hub_create(HubName, Hub, _Options) :- 118 must_be(atom, HubName), 119 message_queue_create(WaitQueue), 120 message_queue_create(ReadyQueue), 121 message_queue_create(EventQueue), 122 message_queue_create(BroadcastQueue), 123 Hub = hub{name:HubName, 124 queues:_{wait:WaitQueue, 125 ready:ReadyQueue, 126 event:EventQueue, 127 broadcast:BroadcastQueue 128 }}, 129 assertz(hub(HubName, Hub)). 130 131 132%! current_hub(?Name, ?Hub) is nondet. 133% 134% True when there exists a hub Hub with Name. 135 136current_hub(HubName, Hub) :- 137 hub(HubName, Hub). 138 139 140 /******************************* 141 * WAITERS * 142 *******************************/ 143 144/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 145The task of this layer is to wait for (a potentially large number of) 146websockets. Whenever there is data on one of these sockets, the socket 147is handed to Hub.queues.ready. This is realised using wait_for_input/3, 148which allows a single thread to wait for many sockets. But ... on 149Windows it allows to wait for at most 64 sockets. In addition, there is 150no way to add an additional input for control messages because Windows 151select() can only wait for sockets. On Unix we could use pipe/2 to add 152the control channal. On Windows we would need an additional network 153service, giving rise its own problems with allocation, firewalls and 154security. 155 156So, instead we keep a queue of websockets that need to be waited for. 157Whenever we add a websocket, we create a waiter thread that will 158typically start waiting for this socket. In addition, we schedule any 159waiting thread that has less than the maximum number of sockets to 160timeout at as good as we can the same time. All of them will hunt for 161the same set of queues, but they have to wait for each other and 162therefore most of the time one thread will walk away with all websockets 163and the others commit suicide because there is nothing to wait for. 164- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 165 166:- meta_predicate 167 hub_thread( , , ). 168 169%! hub_add(+Hub, +WebSocket, ?Id) is det. 170% 171% Add a WebSocket to the hub. Id is used to identify this user. It may 172% be provided (as a ground term) or is generated as a UUID. 173 174hub_add(HubName, WebSocket, Id) :- 175 must_be(atom, HubName), 176 hub(HubName, Hub), 177 ( var(Id) 178 -> uuid(Id) 179 ; true 180 ), 181 message_queue_create(OutputQueue), 182 mutex_create(Lock), 183 % asserta/1 allows for reuse of Id 184 asserta(websocket(HubName, WebSocket, OutputQueue, Lock, Id)), 185 thread_send_message(Hub.queues.wait, WebSocket), 186 thread_send_message(Hub.queues.event, 187 hub{joined:Id}), 188 debug(hub(gate), 'Joined ~w: ~w', [HubName, Id]), 189 create_wait_thread(Hub). 190 191:- if(\+current_predicate(uuid/1)). 192% FIXME: Proper pure Prolog random UUID implementation 193uuid(UUID) :- 194 A is random(1<<63), 195 format(atom(UUID), '~d', [A]). 196:- endif. 197 198create_wait_thread(Hub) :- 199 hub_thread(wait_for_sockets(Hub), Hub, hub_wait_). 200 201wait_for_sockets(Hub) :- 202 wait_for_sockets(Hub, 64). 203 204wait_for_sockets(Hub, Max) :- 205 Queues = Hub.queues, 206 repeat, 207 get_messages(Queues.wait, Max, List), 208 ( List \== [] 209 -> create_new_waiter_if_needed(Hub), 210 sort(List, Set), 211 ( debugging(hub(wait)) 212 -> length(Set, Len), 213 debug(hub(wait), 'Waiting for ~d queues', [Len]) 214 ; true 215 ), 216 wait_for_set(Set, Left, ReadySet, Max), 217 ( ReadySet \== [] 218 -> debug(hub(ready), 'Data on ~p', [ReadySet]), 219 Ready = Queues.ready, 220 maplist(thread_send_message(Ready), ReadySet), 221 create_reader_threads(Hub), 222 ord_subtract(Set, ReadySet, NotReadySet) 223 ; NotReadySet = Left % timeout 224 ), 225 ( NotReadySet \== [] 226 -> debug(hub(wait), 'Re-scheduling: ~p', [NotReadySet]), 227 Wait = Queues.wait, 228 maplist(thread_send_message(Wait), NotReadySet), 229 fail 230 ; true 231 ) 232 ; ! 233 ). 234 235create_new_waiter_if_needed(Hub) :- 236 message_queue_property(Hub.queues.wait, size(0)), 237 !. 238create_new_waiter_if_needed(Hub) :- 239 create_wait_thread(Hub). 240 241%! wait_for_set(+Set0, -Left, -Ready, +Max) is det. 242% 243% Wait for input from Set0. Note that Set0 may contain closed 244% websockets. 245 246wait_for_set([], [], [], _) :- 247 !. 248wait_for_set(Set0, Set, ReadySet, Max) :- 249 wait_timeout(Set0, Max, Timeout), 250 catch(wait_for_input(Set0, ReadySet, Timeout), 251 error(existence_error(stream, S), _), true), 252 ( var(S) 253 -> Set = Set0 254 ; delete(Set0, S, Set1), 255 wait_for_set(Set1, Set, ReadySet, Max) 256 ). 257 258 259%! wait_timeout(+WaitForList, +Max, -TimeOut) is det. 260% 261% Determine the timeout, such that multiple threads waiting for 262% less than the maximum number of sockets time out at the same 263% moment and we can combine them on a single thread. 264 265:- dynamic 266 scheduled_timeout/1. 267 268wait_timeout(List, Max, Timeout) :- 269 length(List, Max), 270 !, 271 Timeout = infinite. 272wait_timeout(_, _, Timeout) :- 273 get_time(Now), 274 ( scheduled_timeout(SchedAt) 275 -> ( SchedAt > Now 276 -> At = SchedAt 277 ; retractall(scheduled_timeout(_)), 278 At is ceiling(Now) + 1, 279 asserta(scheduled_timeout(At)) 280 ) 281 ; At is ceiling(Now) + 1, 282 asserta(scheduled_timeout(At)) 283 ), 284 Timeout is At - Now. 285 286 287%! get_messages(+Queue, +Max, -List) is det. 288% 289% Get the next Max messages from Queue or as many as there are 290% available without blocking very long. This routine is designed 291% such that if multiple threads are running for messages, one gets 292% all of them and the others nothing. 293 294get_messages(Q, N, List) :- 295 with_mutex(hub_wait, 296 get_messages_sync(Q, N, List)). 297 298get_messages_sync(Q, N, [H|T]) :- 299 succ(N2, N), 300 thread_get_message(Q, H, [timeout(0.01)]), 301 !, 302 get_messages_sync(Q, N2, T). 303get_messages_sync(_, _, []). 304 305 306 /******************************* 307 * READERS * 308 *******************************/ 309 310/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 311The next layer consists of `readers'. Whenever one or more websockets 312have data, the socket is added to Hub.queues.ready and 313create_reader_threads/1 is called. This examines the number of ready 314sockets and fires a number of threads to handle the read requests. 315Multiple threads are mainly needed for the case that a client signals to 316be ready, but only provides an incomplete message, causing the 317ws_receive/2 to block. 318 319Each of the threads reads the next message and sends this to 320Hub.queues.event. The websocket is then rescheduled to listen for new 321events. This read either fires a thread to listen for the new waiting 322socket using create_wait_thread/1 or, if there are no more websockets, 323does this job itself. This deals with the common scenario that one 324client wakes up, starts a thread to read its event and waits for new 325messages on the same websockets. 326- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 327 328create_reader_threads(Hub) :- 329 message_queue_property(Hub.queues.ready, size(Ready)), 330 Threads is ceiling(sqrt(Ready)), 331 forall(between(1, Threads, _), 332 create_reader_thread(Hub)). 333 334create_reader_thread(Hub) :- 335 hub_thread(read_message(Hub), Hub, hub_read_ws_). 336 337read_message(Hub) :- 338 Queues = Hub.queues, 339 thread_get_message(Queues.ready, WS, [timeout(0)]), 340 !, 341 catch(ws_receive(WS, Message), Error, true), 342 ( var(Error), 343 websocket(HubName, WS, _, _, Id) 344 -> ( Message.get(opcode) == close 345 -> close_client(WS, Message) 346 ; Event = Message.put(_{client:Id, hub:HubName}), 347 debug(hub(event), 'Event: ~p', [Event]), 348 thread_send_message(Queues.event, Event), 349 ( Message.get(opcode) == close 350 -> CloseError = error(_,_), 351 catch(ws_close(WS, 1000, ""), CloseError, 352 ws_warning(CloseError)) 353 ; thread_send_message(Queues.wait, WS) 354 ), 355 ( message_queue_property(Queues.ready, size(0)) 356 -> !, 357 wait_for_sockets(Hub) 358 ; create_wait_thread(Hub), 359 read_message(Hub) 360 ) 361 ) 362 ; websocket(_, WS, _, _, _) 363 -> io_read_error(WS, Error), 364 read_message(Hub) 365 ; read_message(Hub) % already destroyed 366 ). 367read_message(_). 368 369ws_warning(error(Formal, _)) :- 370 silent(Formal), 371 !. 372ws_warning(Error) :- 373 print_message(warning, Error). 374 375silent(socket_error(epipe, _)). 376 377%! io_read_error(+WebSocket, +Error) 378% 379% Called on a read error from WebSocket. We close the websocket and 380% send the hub an event that we lost the connection to the specified 381% client. Note that we leave destruction of the anonymous message 382% queue and mutex to the Prolog garbage collector. 383 384io_read_error(WebSocket, Error) :- 385 debug(hub(gate), 'Got read error on ~w: ~p', 386 [WebSocket, Error]), 387 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 388 !, 389 E = error(_,_), 390 catch(ws_close(WebSocket, 1011, Error), E, 391 ws_warning(E)), 392 hub(HubName, Hub), 393 thread_send_message(Hub.queues.event, 394 hub{left:Id, 395 hub:HubName, 396 reason:read, 397 error:Error}). 398io_read_error(_, _). % already considered gone 399 400close_client(WebSocket, Message) :- 401 Message.get(data) == end_of_file, 402 !, 403 io_read_error(WebSocket, end_of_file). 404close_client(WebSocket, Message) :- 405 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 406 !, 407 E = error(_,_), 408 catch(ws_close(WebSocket, 1000, "Bye"), E, 409 ws_warning(E)), 410 hub(HubName, Hub), 411 thread_send_message(Hub.queues.event, 412 hub{left:Id, 413 hub:HubName, 414 reason:close, 415 data:Message.data 416 }). 417 418%! io_write_error(+WebSocket, +Message, +Error) 419% 420% Failed to write Message to WebSocket due to Error. Note that this 421% may be a pending but closed WebSocket. We first check whether there 422% is a new one and if not send a `left` message and pass the error 423% such that the client can re-send it when appropriate. 424 425io_write_error(WebSocket, Message, Error) :- 426 debug(hub(gate), 'Got write error on ~w: ~p', 427 [WebSocket, Error]), 428 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 429 !, 430 catch(ws_close(WebSocket, 1011, Error), _, true), 431 ( websocket(_, _, _, _, Id) 432 -> true 433 ; hub(HubName, Hub), 434 thread_send_message(Hub.queues.event, 435 hub{left:Id, 436 hub:HubName, 437 reason:write(Message), 438 error:Error}) 439 ). 440io_write_error(_, _, _). % already considered gone 441 442 443 /******************************* 444 * SENDING MESSAGES * 445 *******************************/ 446 447/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 448My initial thought about sending messages was to add a tuple 449WebSocket-Message to an output queue and have a dynamic number of 450threads sending these messages to the websockets. But, it is desirable 451that, if multiple messages are sent to a particular client, they arrive 452in this order. As multiple threads are performing this task, this is not 453easy to guarantee. Therefore, we create an output queue and a mutex for 454each client. An output thread will walk along the websockets, looking 455for one that has pending messages. It then grabs the lock associated 456with the client and sends all waiting output messages. 457 458The price is that we might peek a significant number of message queues 459before we find one that contains messages. If this proves to be a 460significant problem, we could maintain a queue of queues holding 461messages. 462- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 463 464%! hub_send(+ClientId, +Message) is semidet. 465% 466% Send message to the indicated ClientId. Fails silently if ClientId 467% does not exist. 468% 469% @arg Message is either a single message (as accepted by 470% ws_send/2) or a list of such messages. 471 472hub_send(ClientId, Message) :- 473 websocket(HubName, _WS, ClientQueue, _Lock, ClientId), 474 hub(HubName, Hub), 475 ( is_list(Message) 476 -> maplist(queue_output(ClientQueue), Message) 477 ; queue_output(ClientQueue, Message) 478 ), 479 create_output_thread(Hub, ClientQueue). 480 481create_output_thread(Hub, Queue) :- 482 hub_thread(broadcast_from_queue(Queue, [timeout(0)]), 483 Hub, hub_out_q_). 484 485%! hub_broadcast(+Hub, +Message) is det. 486%! hub_broadcast(+Hub, +Message, :Condition) is det. 487% 488% Send Message to all websockets associated with Hub for which 489% call(Condition, Id) succeeds. Note that this process is 490% _asynchronous_: this predicate returns immediately after putting 491% all requests in a broadcast queue. If a message cannot be 492% delivered due to a network error, the hub is informed through 493% io_error/3. 494 495hub_broadcast(HubName, Message) :- 496 hub_broadcast(HubName, Message, all). 497 498all(_). 499 500hub_broadcast(HubName, Message, Condition) :- 501 must_be(atom, HubName), 502 hub(HubName, Hub), 503 State = count(0), 504 forall(( websocket(HubName, _WS, ClientQueue, _Lock, Id), 505 call(Condition, Id) 506 ), 507 ( queue_output(ClientQueue, Message), 508 inc_count(State) 509 )), 510 State = count(Count), 511 create_broadcast_threads(Hub, Count). 512 513queue_output(Queue, Message) :- 514 thread_send_message(Queue, Message). 515 516inc_count(State) :- 517 arg(1, State, C0), 518 C1 is C0+1, 519 nb_setarg(1, State, C1). 520 521create_broadcast_threads(Hub, Count) :- 522 Threads is ceiling(sqrt(Count)), 523 forall(between(1, Threads, _), 524 create_broadcast_thread(Hub)). 525 526create_broadcast_thread(Hub) :- 527 hub_thread(broadcast_from_queues(Hub, [timeout(0)]), 528 Hub, hub_out_all_). 529 530 531%! broadcast_from_queues(+Hub, +Options) is det. 532% 533% Broadcast from over all known queues. 534 535broadcast_from_queues(Hub, Options) :- 536 forall(websocket(Hub.name, _WebSocket, Queue, _Lock, _Id), 537 broadcast_from_queue(Queue, Options)). 538 539 540%! broadcast_from_queue(+Queue, +Options) is det. 541% 542% Send all messages pending for Queue. Note that this predicate 543% locks the mutex associated with the Queue, such that other 544% workers cannot start sending messages to this client. Concurrent 545% sending would lead to out-of-order arrival of broadcast 546% messages. If the mutex is already held, someone else is 547% processing this message queue, so we don't have to worry. 548 549broadcast_from_queue(Queue, _Options) :- 550 message_queue_property(Queue, size(0)), 551 !. 552broadcast_from_queue(Queue, Options) :- 553 websocket(_Hub, _WebSocket, Queue, Lock, _Id), 554 !, 555 ( setup_call_cleanup( 556 mutex_trylock(Lock), 557 broadcast_from_queue_sync(Queue, Options), 558 mutex_unlock(Lock)) 559 -> true 560 ; true 561 ). 562broadcast_from_queue(_, _). 563 564% Note that we re-fetch websocket/5, such that we terminate if something 565% closed the websocket. 566 567broadcast_from_queue_sync(Queue, Options) :- 568 repeat, 569 ( websocket(_Hub, WebSocket, Queue, _Lock, _Id), 570 thread_get_message(Queue, Message, Options) 571 -> debug(hub(broadcast), 572 'To: ~p messages: ~p', [WebSocket, Message]), 573 catch(ws_send(WebSocket, Message), E, 574 io_write_error(WebSocket, Message, E)), 575 fail 576 ; ! 577 ). 578 579%! hub_thread(:Goal, +Hub, +Task) is det. 580% 581% Create a (temporary) thread for the hub to perform Task. We 582% created named threads if debugging hub(thread) is enabled. 583 584hub_thread(Goal, _, Task) :- 585 debugging(hub(thread)), 586 !, 587 gensym(Task, Alias), 588 thread_create(Goal, _, [detached(true), alias(Alias)]). 589hub_thread(Goal, _, _) :- 590 thread_create(Goal, _, [detached(true)])