1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald and Jan Wielemaker 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2012-2013, Jeffrey Rosenwald 7 2018, CWI Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(udp_broadcast, 37 [ udp_broadcast_initialize/2, % +IPAddress, +Options 38 udp_broadcast_close/1, % +Scope 39 40 udp_peer_add/2, % +Scope, +IP 41 udp_peer_del/2, % +Scope, ?IP 42 udp_peer/2 % +Scope, -IP 43 ]). 44:- use_module(library(socket)). 45:- use_module(library(broadcast)). 46:- use_module(library(option)). 47:- use_module(library(apply)). 48:- use_module(library(debug)). 49:- use_module(library(error)). 50 51% :- debug(udp(broadcast)). 52 53/** <module> A UDP broadcast proxy 54 55SWI-Prolog's broadcast library provides a means that may be used to 56facilitate publish and subscribe communication regimes between anonymous 57members of a community of interest. The members of the community are 58however, necessarily limited to a single instance of Prolog. The UDP 59broadcast library removes that restriction. With this library loaded, 60any member on your local IP subnetwork that also has this library loaded 61may hear and respond to your broadcasts. 62 63This library support three styles of networking as described below. Each 64of these networks have their own advantages and disadvantages. Please 65study the literature to understand the consequences. 66 67 $ broadcast : 68 Broadcast messages are sent to the LAN subnet. The broadcast 69 implementation uses two UDP ports: a public to address the whole 70 group and a private one to address a specific node. Broadcasting 71 is generally a good choice if the subnet is small and traffic is 72 low. 73 74 $ unicast : 75 Unicast sends copies of packages to known peers. Unicast networks 76 can easily be routed. The unicast version uses a single UDP port 77 per node. Unicast is generally a good choice for a small party, 78 in particular if the peers are in different networks. 79 80 $ multicast : 81 Multicast is like broadcast, but it can be configured to 82 work accross networks and may work more efficiently on VLAN networks. 83 Like the broadcast setup, two UDP ports are used. Multicasting can 84 in general deliver the most efficient LAN and WAN networks, but 85 requires properly configured routing between the peers. 86 87After initialization and, in the case of a _unicast_ network managing 88the set of peers, communication happens through broadcast/1, 89broadcast_request/1 and listen/1,2,3. 90 91A broadcast/1 or broadcast_request/1 of the shape udp(Scope, Term) or 92udp(Scope, Term, TimeOut) is forwarded over the UDP network to all peers 93that joined the same `Scope`. To prevent the potential for feedback 94loops, only the plain `Term` is broadcasted locally. The timeout is 95optional. It specifies the amount to time to wait for replies to arrive 96in response to a broadcast_request/1. The default period is 0.250 97seconds. The timeout is ignored for broadcasts. 98 99An example of three separate processes cooperating in the same _scope_ 100called `peers`: 101 102== 103Process A: 104 105 ?- listen(number(X), between(1, 5, X)). 106 true. 107 108 ?- 109 110Process B: 111 112 ?- listen(number(X), between(7, 9, X)). 113 true. 114 115 ?- 116 117Process C: 118 119 ?- findall(X, broadcast_request(udp(peers, number(X))), Xs). 120 Xs = [1, 2, 3, 4, 5, 7, 8, 9]. 121 122 ?- 123== 124 125It is also possible to carry on a private dialog with a single 126responder. To do this, you supply a compound of the form, Term:PortId, 127to a UDP scoped broadcast/1 or broadcast_request/1, where PortId is the 128ip-address and port-id of the intended listener. If you supply an 129unbound variable, PortId, to broadcast_request, it will be unified with 130the address of the listener that responds to Term. You may send a 131directed broadcast to a specific member by simply providing this address 132in a similarly structured compound to a UDP scoped broadcast/1. The 133message is sent via unicast to that member only by way of the member's 134broadcast listener. It is received by the listener just as any other 135broadcast would be. The listener does not know the difference. 136 137For example, in order to discover who responded with a particular value: 138 139== 140Host B Process 1: 141 142 ?- listen(number(X), between(1, 5, X)). 143 true. 144 145 ?- 146 147Host A Process 1: 148 149 150 ?- listen(number(X), between(7, 9, X)). 151 true. 152 153 ?- 154 155Host A Process 2: 156 157 ?- listen(number(X), between(1, 5, X)). 158 true. 159 160 ?- bagof(X, broadcast_request(udp(peers,number(X):From,1)), Xs). 161 From = ip(192, 168, 1, 103):34855, 162 Xs = [7, 8, 9] ; 163 From = ip(192, 168, 1, 103):56331, 164 Xs = [1, 2, 3, 4, 5] ; 165 From = ip(192, 168, 1, 104):3217, 166 Xs = [1, 2, 3, 4, 5]. 167== 168 169All incomming trafic is handled by a single thread with the alias 170`udp_inbound_proxy`. This thread also performs the internal dispatching 171using broadcast/1 and broadcast_request/1. Future versions may provide 172for handling these requests in seperate threads. 173 174 175## Caveats {#udp-broadcase-caveats} 176 177While the implementation is mostly transparent, there are some important 178and subtle differences that must be taken into consideration: 179 180 * UDP broadcast requires an initialization step in order to 181 launch the broadcast listener proxy. See 182 udp_broadcast_initialize/2. 183 184 * Prolog's broadcast_request/1 is nondet. It sends the request, 185 then evaluates the replies synchronously, backtracking as needed 186 until a satisfactory reply is received. The remaining potential 187 replies are not evaluated. With UDP, all peers will send all 188 answers to the query. The receiver may however stop listening. 189 190 * A UDP broadcast/1 is completely asynchronous. 191 192 * A UDP broadcast_request/1 is partially synchronous. A 193 broadcast_request/1 is sent, then the sender balks for a period of 194 time (default: 250 ms) while the replies are collected. Any reply 195 that is received after this period is silently discarded. A 196 optional second argument is provided so that a sender may specify 197 more (or less) time for replies. 198 199 * Replies are presented to the user as a choice point on arrival, 200 until the broadcast request timer finally expires. This 201 allows traffic to propagate through the system faster and provides 202 the requestor with the opportunity to terminate a broadcast request 203 early if desired, by simply cutting choice points. 204 205 * Please beware that broadcast request transactions remain active 206 and resources consumed until broadcast_request finally fails on 207 backtracking, an uncaught exception occurs, or until choice points 208 are cut. Failure to properly manage this will likely result in 209 chronic exhaustion of UDP sockets. 210 211 * If a listener is connected to a generator that always succeeds 212 (e.g. a random number generator), then the broadcast request will 213 never terminate and trouble is bound to ensue. 214 215 * broadcast_request/1 with =|udp_subnet|= scope is _not_ reentrant. 216 If a listener performs a broadcast_request/1 with UDP scope 217 recursively, then disaster looms certain. This caveat does not apply 218 to a UDP scoped broadcast/1, which can safely be performed from a 219 listener context. 220 221 * UDP broadcast's capacity is not infinite. While it can tolerate 222 substantial bursts of activity, it is designed for short bursts of 223 small messages. Unlike TIPC, UDP is unreliable and has no QOS 224 protections. Congestion is likely to cause trouble in the form of 225 non-Byzantine failure. That is, late, lost (e.g. infinitely late), 226 or duplicate datagrams. Caveat emptor. 227 228 * A UDP broadcast_request/1 term that is grounded is considered to 229 be a broadcast only. No replies are collected unless the there is at 230 least one unbound variable to unify. 231 232 * A UDP broadcast/1 always succeeds, even if there are no 233 listeners. 234 235 * A UDP broadcast_request/1 that receives no replies will fail. 236 237 * Replies may be coming from many different places in the network 238 (or none at all). No ordering of replies is implied. 239 240 * Prolog terms are sent to others after first converting them to 241 atoms using term_string/3. Serialization does not deal with cycles, 242 attributes or sharing. The hook udp_term_string_hook/3 may be 243 defined to change the message serialization and support different 244 message formats and/or encryption. 245 246 * The broadcast model is based on anonymity and a presumption of 247 trust--a perfect recipe for compromise. UDP is an Internet protocol. 248 A UDP broadcast listener exposes a public port, which is 249 static and shared by all listeners, and a private port, which is 250 semi-static and unique to the listener instance. Both can be seen 251 from off-cluster nodes and networks. Usage of this module exposes 252 the node and consequently, the cluster to significant security 253 risks. So have a care when designing your application. You must talk 254 only to those who share and contribute to your concerns using a 255 carefully prescribed protocol. 256 257 * UDP broadcast categorically and silently ignores all message 258 traffic originating from or terminating on nodes that are not 259 members of the local subnet. This security measure only keeps honest 260 people honest! 261 262@author Jeffrey Rosenwald (JeffRose@acm.org), Jan Wielemaker 263@license BSD-2 264@see tipc.pl 265*/ 266 267:- multifile 268 udp_term_string_hook/3, % +Scope, ?Term, ?String 269 udp_unicast_join_hook/3, % +Scope, +From, +Data 270 black_list/1. % +Term 271 272:- meta_predicate safely( ). 273 274safely(Predicate) :- 275 catch(Predicate, Err, 276 ( Err == '$aborted' 277 -> !, fail 278 ; print_message(error, Err), fail 279 )). 280 281udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :- 282 IPAddress = ip(A1, A2, A3, A4), 283 Subnet = ip(S1, S2, S3, S4), 284 BroadcastAddress = ip(B1, B2, B3, B4), 285 286 B1 is A1 \/ (S1 xor 255), 287 B2 is A2 \/ (S2 xor 255), 288 B3 is A3 \/ (S3 xor 255), 289 B4 is A4 \/ (S4 xor 255). 290 291%! udp_broadcast_service(?Scope, ?Address) is nondet. 292% 293% provides the UDP broadcast address for a given Scope. At present, 294% only one scope is supported, =|udp_subnet|=. 295 296%! udp_scope(?ScopeName, ?ScopeDef) 297 298:- dynamic 299 udp_scope/2, 300 udp_scope_peer/2. 301:- volatile 302 udp_scope/2, 303 udp_scope_peer/2. 304% 305% Here's a UDP proxy to Prolog's broadcast library 306% 307% A sender may extend a broadcast to a subnet of a UDP network by 308% specifying a =|udp_subnet|= scoping qualifier in his/her broadcast. 309% The qualifier has the effect of selecting the appropriate multi-cast 310% address for the transmission. Thus, the sender of the message has 311% control over the scope of his/her traffic on a per-message basis. 312% 313% All in-scope listeners receive the broadcast and simply rebroadcast 314% the message locally. All broadcast replies, if any, are sent directly 315% to the sender via the port-id that was received with the broadcast. 316% 317% Each listener exposes two UDP ports, a shared public port that is 318% bound to a well-known port number and a private port that uniquely 319% indentifies the listener. Broadcasts are received on the public port 320% and replies are sent on the private port. Directed broadcasts 321% (unicasts) are received on the private port and replies are sent on 322% the private port. 323 324% Thread 1 listens for directed traffic on the private port. 325% 326 327:- dynamic 328 udp_private_socket/3, % Port, Socket, FileNo 329 udp_public_socket/4, % Scope, Port, Socket, FileNo 330 udp_closed/1. % Scope 331 332udp_inbound_proxy :- 333 make_private_socket, 334 forall(udp_scope(Scope, ScopeData), 335 make_public_socket(ScopeData, Scope)), 336 retractall(udp_closed(_)), 337 findall(FileNo, udp_socket_file_no(FileNo), FileNos), 338 catch(dispatch_inbound(FileNos), 339 E, dispatch_exception(E)), 340 udp_inbound_proxy. 341 342dispatch_exception(E) :- 343 E = error(_,_), 344 !, 345 print_message(warning, E). 346dispatch_exception(_). 347 348 349%! make_private_socket is det. 350% 351% Create our private socket. This socket is used for messages that are 352% directed to me. Note that we only need this for broadcast networks. 353% If we use a unicast network we use our public port to contact this 354% specific server. 355 356make_private_socket :- 357 udp_private_socket(_Port, S, _F), 358 !, 359 ( ( udp_scope(Scope, broadcast(_,_,_)) 360 ; udp_scope(Scope, multicast(_,_)) 361 ), 362 \+ udp_closed(Scope) 363 -> true 364 ; tcp_close_socket(S), 365 retractall(udp_private_socket(_,_,_)) 366 ). 367make_private_socket :- 368 udp_scope(_, broadcast(_,_,_)), 369 !, 370 udp_socket(S), 371 tcp_bind(S, Port), 372 tcp_getopt(S, file_no(F)), 373 tcp_setopt(S, broadcast), 374 assertz(udp_private_socket(Port, S, F)). 375make_private_socket :- 376 udp_scope(_, multicast(_,_)), 377 !, 378 udp_socket(S), 379 tcp_bind(S, Port), 380 tcp_getopt(S, file_no(F)), 381 assertz(udp_private_socket(Port, S, F)). 382make_private_socket. 383 384%! make_public_socket(+ScopeData, +Scope) 385% 386% Create the public port Scope. 387 388make_public_socket(_, Scope) :- 389 udp_public_socket(Scope, _Port, S, _), 390 !, 391 ( udp_closed(Scope) 392 -> tcp_close_socket(S), 393 retractall(udp_public_socket(Scope, _, _, _)) 394 ; true 395 ). 396make_public_socket(broadcast(_SubNet, _Broadcast, Port), Scope) :- 397 udp_socket(S), 398 tcp_setopt(S, reuseaddr), 399 tcp_bind(S, Port), 400 tcp_getopt(S, file_no(F)), 401 assertz(udp_public_socket(Scope, Port, S, F)). 402make_public_socket(multicast(Group, Port), Scope) :- 403 udp_socket(S), 404 tcp_setopt(S, reuseaddr), 405 tcp_bind(S, Port), 406 tcp_setopt(S, ip_add_membership(Group)), 407 tcp_getopt(S, file_no(F)), 408 assertz(udp_public_socket(Scope, Port, S, F)). 409make_public_socket(unicast(Port), Scope) :- 410 udp_socket(S), 411 tcp_bind(S, Port), 412 tcp_getopt(S, file_no(F)), 413 assertz(udp_public_socket(Scope, Port, S, F)). 414 415udp_socket_file_no(FileNo) :- 416 udp_private_socket(_,_,FileNo). 417udp_socket_file_no(FileNo) :- 418 udp_public_socket(_,_,_,FileNo). 419 420%! dispatch_inbound(+FileNos) 421% 422% Dispatch inbound traffic. This loop uses wait_for_input/3 to wait 423% for one or more UDP sockets and dispatches the requests using the 424% internal broadcast service. For an incomming broadcast _request_ we 425% send the reply only to the requester and therefore we must use a 426% socket that is not in broadcast mode. 427 428dispatch_inbound(FileNos) :- 429 debug(udp(broadcast), 'Waiting for ~p', [FileNos]), 430 wait_for_input(FileNos, Ready, infinite), 431 debug(udp(broadcast), 'Ready: ~p', [Ready]), 432 maplist(dispatch_ready, Ready), 433 dispatch_inbound(FileNos). 434 435dispatch_ready(FileNo) :- 436 udp_private_socket(_Port, Private, FileNo), 437 !, 438 udp_receive(Private, Data, From, [max_message_size(65535)]), 439 debug(udp(broadcast), 'Inbound on private port', []), 440 ( in_scope(Scope, From), 441 udp_term_string(Scope, Term, Data) % only accept valid data 442 -> ld_dispatch(Private, Term, From, Scope) 443 ; true 444 ). 445dispatch_ready(FileNo) :- 446 udp_public_socket(Scope, _PublicPort, Public, FileNo), 447 !, 448 udp_receive(Public, Data, From, [max_message_size(65535)]), 449 debug(udp(broadcast), 'Inbound on public port from ~p for scope ~p', 450 [From, Scope]), 451 ( in_scope(Scope, From), 452 udp_term_string(Scope, Term, Data) % only accept valid data 453 -> ( udp_scope(Scope, unicast(_)) 454 -> ld_dispatch(Public, Term, From, Scope) 455 ; udp_private_socket(_PrivatePort, Private, _FileNo), 456 ld_dispatch(Private, Term, From, Scope) 457 ) 458 ; udp_scope(Scope, unicast(_)), 459 udp_term_string(Scope, Term, Data), 460 unicast_out_of_scope_request(Scope, From, Term) 461 -> true 462 ; true 463 ). 464 465in_scope(Scope, Address) :- 466 udp_scope(Scope, ScopeData), 467 in_scope(ScopeData, Scope, Address), 468 !. 469in_scope(Scope, From) :- 470 debug(udp(broadcast), 'Out-of-scope ~p datagram from ~p', 471 [Scope, From]), 472 fail. 473 474in_scope(broadcast(Subnet, Broadcast, _PublicPort), _Scope, IP:_FromPort) :- 475 udp_broadcast_address(IP, Subnet, Broadcast). 476in_scope(multicast(_Group, _Port), _Scope, _From). 477in_scope(unicast(_PublicPort), Scope, IP:_) :- 478 udp_peer(Scope, IP:_). 479 480 481%! ld_dispatch(+PrivateSocket, +Term, +From, +Scope) 482% 483% Locally dispatch Term received from From. If it concerns a broadcast 484% request, send the replies to PrivateSocket to From. The multifile 485% hook black_list/1 can be used to ignore certain messages. 486 487ld_dispatch(_S, Term, From, _Scope) :- 488 debug(udp(broadcast), 'ld_dispatch(~p) from ~p', [Term, From]), 489 fail. 490ld_dispatch(_S, Term, _From, _Scope) :- 491 blacklisted(Term), !. 492ld_dispatch(S, request(Key, Term), From, Scope) :- 493 !, 494 forall(safely(broadcast_request(Term)), 495 safely((udp_term_string(Scope, reply(Key,Term), Message), 496 udp_send(S, Message, From, [])))). 497ld_dispatch(_S, send(Term), _From, _Scope) :- 498 safely(broadcast(Term)). 499ld_dispatch(_S, reply(Key, Term), From, _Scope) :- 500 ( reply_queue(Key, Queue) 501 -> safely(thread_send_message(Queue, Term:From)) 502 ; true 503 ). 504 505blacklisted(send(Term)) :- black_list(Term). 506blacklisted(request(_,Term)) :- black_list(Term). 507blacklisted(reply(_,Term)) :- black_list(Term). 508 509 510%! reload_udp_proxy 511% 512% Update the UDP relaying proxy service. The proxy consists of three 513% forwarding mechanisms: 514% 515% - Listen on our _scope_. If any messages are received, hand them 516% to udp_broadcast/3 to be broadcasted to _scope_ or sent to a 517% specific recipient. 518% - Listen on the _scope_ public port. Incomming messages are 519% relayed to the internal broadcast mechanism and replies are sent 520% to from our private socket. 521% - Listen on our private port and reply using the same port. 522 523reload_udp_proxy :- 524 reload_outbound_proxy, 525 reload_inbound_proxy. 526 527reload_outbound_proxy :- 528 listening(udp_broadcast, udp(_,_), _), 529 !. 530reload_outbound_proxy :- 531 listen(udp_broadcast, udp(Scope,Message), 532 udp_broadcast(Message, Scope, 0.25)), 533 listen(udp_broadcast, udp(Scope,Message,Timeout), 534 udp_broadcast(Message, Scope, Timeout)), 535 listen(udp_broadcast, udp_subnet(Message), % backward compatibility 536 udp_broadcast(Message, subnet, 0.25)), 537 listen(udp_broadcast, udp_subnet(Message,Timeout), 538 udp_broadcast(Message, subnet, Timeout)). 539 540reload_inbound_proxy :- 541 catch(thread_signal(udp_inbound_proxy, throw(udp_reload)), 542 error(existence_error(thread, _),_), 543 fail), 544 !. 545reload_inbound_proxy :- 546 thread_create(udp_inbound_proxy, _, 547 [ alias(udp_inbound_proxy), 548 detached(true) 549 ]). 550 551%! udp_broadcast_close(+Scope) 552% 553% Close a UDP broadcast scope. 554 555udp_broadcast_close(Scope) :- 556 udp_scope(Scope, _ScopeData), 557 !, 558 assert(udp_closed(Scope)), 559 reload_udp_proxy. 560udp_broadcast_close(_). 561 562 563%! udp_broadcast(+What, +Scope, +TimeOut) 564% 565% Send a broadcast request to my UDP peers in Scope. What is either of 566% the shape `Term:Address` to send Term to a specific address or query 567% the address from which term is answered or it is a plain `Term`. 568% 569% If `Term` is nonground, it is considered is a _request_ (see 570% broadcast_request/1) and the predicate succeeds for each answer 571% received within TimeOut seconds. If Term is ground it is considered 572% an asynchronous broadcast and udp_broadcast/3 is deterministic. 573 574udp_broadcast(Term:To, Scope, _Timeout) :- 575 ground(Term), ground(To), % broadcast to single listener 576 !, 577 udp_basic_broadcast(send(Term), Scope, single(To)). 578udp_broadcast(Term, Scope, _Timeout) :- 579 ground(Term), % broadcast to all listeners 580 !, 581 udp_basic_broadcast(send(Term), Scope, broadcast). 582udp_broadcast(Term:To, Scope, Timeout) :- 583 ground(To), % request to single listener 584 !, 585 setup_call_cleanup( 586 request_queue(Id, Queue), 587 ( udp_basic_broadcast(request(Id, Term), Scope, single(To)), 588 udp_br_collect_replies(Queue, Timeout, Term:To) 589 ), 590 destroy_request_queue(Queue)). 591udp_broadcast(Term:From, Scope, Timeout) :- 592 !, % request to all listeners, collect sender 593 setup_call_cleanup( 594 request_queue(Id, Queue), 595 ( udp_basic_broadcast(request(Id, Term), Scope, broadcast), 596 udp_br_collect_replies(Queue, Timeout, Term:From) 597 ), 598 destroy_request_queue(Queue)). 599udp_broadcast(Term, Scope, Timeout) :- % request to all listeners 600 udp_broadcast(Term:_, Scope, Timeout). 601 602:- dynamic 603 reply_queue/2. 604 605request_queue(Id, Queue) :- 606 Id is random(1<<63), 607 message_queue_create(Queue), 608 asserta(reply_queue(Id, Queue)). 609 610destroy_request_queue(Queue) :- % leave queue to GC 611 retractall(reply_queue(_, Queue)). 612 613 614%! udp_basic_broadcast(+Term, +Dest) is multi. 615% 616% Create a UDP private socket and use it to send Term to Address. If 617% Address is our broadcast address, set the socket in broadcast mode. 618% 619% This predicate succeeds with a choice point. Committing the choice 620% point closes S. 621% 622% @arg Dest is one of single(Target) or `broadcast`. 623 624udp_basic_broadcast(Term, Scope, Dest) :- 625 debug(udp(broadcast), 'UDP proxy outbound ~p to ~p', [Term, Dest]), 626 udp_term_string(Scope, Term, String), 627 udp_send_message(Dest, String, Scope). 628 629udp_send_message(single(Address), String, Scope) :- 630 ( udp_scope(Scope, unicast(_)) 631 -> udp_public_socket(Scope, _Port, S, _) 632 ; udp_private_socket(_Port, S, _F) 633 ), 634 safely(udp_send(S, String, Address, [])). 635udp_send_message(broadcast, String, Scope) :- 636 ( udp_scope(Scope, unicast(_)) 637 -> udp_public_socket(Scope, _Port, S, _), 638 forall(udp_peer(Scope, Address), 639 ( debug(udp(broadcast), 'Unicast to ~p', [Address]), 640 safely(udp_send(S, String, Address, [])))) 641 ; udp_scope(Scope, broadcast(_SubNet, Broadcast, Port)) 642 -> udp_private_socket(_PrivatePort, S, _F), 643 udp_send(S, String, Broadcast:Port, []) 644 ; udp_scope(Scope, multicast(Group, Port)) 645 -> udp_private_socket(_PrivatePort, S, _F), 646 udp_send(S, String, Group:Port, []) 647 ). 648 649% ! udp_br_collect_replies(+Queue, +TimeOut, -TermAndFrom) is nondet. 650% 651% Collect replies on Socket for TimeOut seconds. Succeed for each 652% received message. 653 654udp_br_collect_replies(Queue, Timeout, Reply) :- 655 get_time(Start), 656 Deadline is Start+Timeout, 657 repeat, 658 ( thread_get_message(Queue, Reply, 659 [ deadline(Deadline) 660 ]) 661 -> true 662 ; !, 663 fail 664 ). 665 666%! udp_broadcast_initialize(+IPAddress, +Options) is semidet. 667% 668% Initialized UDP broadcast bridge. IPAddress is the IP address on the 669% network we want to broadcast on. IP addresses are terms ip(A,B,C,D) 670% or an atom or string of the format =|A.B.C.D|=. Options processed: 671% 672% - scope(+ScopeName) 673% Name of the scope. Default is `subnet`. 674% - subnet_mask(+SubNet) 675% Subnet to broadcast on. This uses the same syntax as IPAddress. 676% Default classifies the network as class A, B or C depending on 677% the the first octet and applies the default mask. 678% - port(+Port) 679% Public port to use. Default is 20005. 680% - method(+Method) 681% Method to send a message to multiple peers. One of 682% - broadcast 683% Use UDP broadcast messages to the LAN. This is the 684% default 685% - multicast 686% Use UDP multicast messages. This can be used on WAN networks, 687% provided the intermediate routers understand multicast. 688% - unicast 689% Send the messages individually to all registered peers. 690% 691% For compatibility reasons Options may be the subnet mask. 692 693udp_broadcast_initialize(IP, Options) :- 694 with_mutex(udp_broadcast, 695 udp_broadcast_initialize_sync(IP, Options)). 696 697udp_broadcast_initialize_sync(IP, Options) :- 698 nonvar(Options), 699 Options = ip(_,_,_,_), 700 !, 701 udp_broadcast_initialize(IP, [subnet_mask(Options)]). 702udp_broadcast_initialize_sync(IP, Options) :- 703 to_ip4(IP, IPAddress), 704 option(method(Method), Options, broadcast), 705 must_be(oneof([broadcast, multicast, unicast]), Method), 706 udp_broadcast_initialize_sync(Method, IPAddress, Options), 707 reload_udp_proxy. 708 709udp_broadcast_initialize_sync(broadcast, IPAddress, Options) :- 710 option(subnet_mask(Subnet), Options, _), 711 mk_subnet(Subnet, IPAddress, Subnet4), 712 option(port(Port), Options, 20005), 713 option(scope(Scope), Options, subnet), 714 715 udp_broadcast_address(IPAddress, Subnet4, Broadcast), 716 udp_broadcast_close(Scope), 717 assertz(udp_scope(Scope, broadcast(Subnet4, Broadcast, Port))). 718udp_broadcast_initialize_sync(unicast, _IPAddress, Options) :- 719 option(port(Port), Options, 20005), 720 option(scope(Scope), Options, subnet), 721 udp_broadcast_close(Scope), 722 assertz(udp_scope(Scope, unicast(Port))). 723udp_broadcast_initialize_sync(multicast, IPAddress, Options) :- 724 option(port(Port), Options, 20005), 725 option(scope(Scope), Options, subnet), 726 udp_broadcast_close(Scope), 727 multicast_address(IPAddress), 728 assertz(udp_scope(Scope, multicast(IPAddress, Port))). 729 730to_ip4(Atomic, ip(A,B,C,D)) :- 731 atomic(Atomic), 732 !, 733 ( split_string(Atomic, ".", "", Strings), 734 maplist(number_string, [A,B,C,D], Strings) 735 -> true 736 ; syntax_error(illegal_ip_address) 737 ). 738to_ip4(IP, IP). 739 740mk_subnet(Var, IP, Subnet) :- 741 var(Var), 742 !, 743 ( default_subnet(IP, Subnet) 744 -> true 745 ; domain_error(ip_with_subnet, IP) 746 ). 747mk_subnet(Subnet, _, Subnet4) :- 748 to_ip4(Subnet, Subnet4). 749 750default_subnet(ip(A,_,_,_), ip(A,0,0,0)) :- 751 between(1,126, A), !. 752default_subnet(ip(A,B,_,_), ip(A,B,0,0)) :- 753 between(128,191, A), !. 754default_subnet(ip(A,B,C,_), ip(A,B,C,0)) :- 755 between(192,223, A), !. 756 757multicast_address(ip(A,_,_,_)) :- 758 between(224, 239, A), 759 !. 760multicast_address(IP) :- 761 domain_error(multicast_network, IP). 762 763 764 /******************************* 765 * UNICAST PEERS * 766 *******************************/ 767 768%! udp_peer_add(+Scope, +Address) is det. 769%! udp_peer_del(+Scope, ?Address) is det. 770%! udp_peer(?Scope, ?Address) is nondet. 771% 772% Manage and query the set of known peers for a unicast network. 773% Address is either a term IP:Port or a plain IP address. In the 774% latter case the default port registered with the scope is used. 775% 776% @arg Address has canonical form ip(A,B,C,D):Port. 777 778udp_peer_add(Scope, Address) :- 779 must_be(ground, Address), 780 peer_address(Address, Scope, Canonical), 781 ( udp_scope_peer(Scope, Canonical) 782 -> true 783 ; assertz(udp_scope_peer(Scope, Canonical)) 784 ). 785 786udp_peer_del(Scope, Address) :- 787 peer_address(Address, Scope, Canonical), 788 retractall(udp_scope_peer(Scope, Canonical)). 789 790udp_peer(Scope, IPAddress) :- 791 udp_scope_peer(Scope, IPAddress). 792 793peer_address(IP:Port, _Scope, IPAddress:Port) :- 794 !, 795 to_ip4(IP, IPAddress). 796peer_address(IP, Scope, IPAddress:Port) :- 797 ( udp_scope(Scope, unicast(Port)) 798 -> true 799 ; existence_error(udp_scope, Scope) 800 ), 801 to_ip4(IP, IPAddress). 802 803 804 805 /******************************* 806 * HOOKS * 807 *******************************/ 808 809%! udp_term_string_hook(+Scope, +Term, -String) is det. 810%! udp_term_string_hook(+Scope, -Term, +String) is semidet. 811% 812% Hook for serializing the message Term. The default writes 813% =|%prolog\n|=, followed by the Prolog term in quoted notation while 814% ignoring operators. This hook may use alternative serialization such 815% as fast_term_serialized/2, use library(ssl) to realise encrypted 816% messages, etc. 817% 818% @arg Scope is the scope for which the message is broadcasted. This 819% can be used to use different serialization for different scopes. 820% @arg Term encapsulates the term broadcasted by the application as 821% follows: 822% 823% - send(ApplTerm) 824% Is sent by broadcast(udp(Scope, ApplTerm)) 825% - request(Id,ApplTerm) 826% Is sent by broadcast_request/1, where Id is a unique large 827% (64 bit) integer. 828% - reply(Id,ApplTerm) 829% Is sent to reply on a broadcast_request/1 request that has 830% been received. Arguments are the same as above. 831 832%! udp_term_string(+Scope, +Term, -String) is det. 833%! udp_term_string(+Scope, -Term, +String) is semidet. 834% 835% Serialize an arbitrary Prolog term as a string. The string is 836% prefixed by a magic key to ensure we only accept messages that are 837% meant for us. 838% 839% In mode (+,-), Term is written with the options ignore_ops(true) and 840% quoted(true). 841% 842% This predicate first calls udp_term_string_hook/3. 843 844udp_term_string(Scope, Term, String) :- 845 udp_term_string_hook(Scope, Term, String), 846 !. 847udp_term_string(_Scope, Term, String) :- 848 ( var(String) 849 -> format(string(String), '%-prolog-\n~W', 850 [ Term, 851 [ ignore_ops(true), 852 quoted(true) 853 ] 854 ]) 855 ; sub_string(String, 0, _, _, '%-prolog-\n'), 856 term_string(Term, String, 857 [ syntax_errors(quiet) 858 ]) 859 ). 860 861%! unicast_out_of_scope_request(+Scope, +From, +Data) is semidet. 862 863%! udp_unicast_join_hook(+Scope, +From, +Data) is semidet. 864% 865% This multifile hook is called if an UDP package is received on the 866% port of the unicast network identified by Scope. From is the origin 867% IP and port and Data is the message data that is deserialized as 868% defined for the scope (see udp_term_string/3). 869% 870% This hook is intended to initiate a new node joining the network of 871% peers. We could in theory also omit the in-scope test and use a 872% normal broadcast to join. Using a different channal however provides 873% a basic level of security. A possibe implementation is below. The 874% first fragment is a hook added to the server, the second is a 875% predicate added to a client and the last initiates the request in 876% the client. The excanged term (join(X)) can be used to exchange a 877% welcome handshake. 878% 879% 880% ``` 881% :- multifile udp_broadcast:udp_unicast_join_hook/3. 882% udp_broadcast:udp_unicast_join_hook(Scope, From, join(welcome)) :- 883% udp_peer_add(Scope, From), 884% ``` 885% 886% ``` 887% join_request(Scope, Address, Reply) :- 888% udp_peer_add(Scope, Address), 889% broadcast_request(udp(Scope, join(X))). 890% ``` 891% 892% ``` 893% ?- join_request(myscope, "1.2.3.4":10001, Reply). 894% Reply = welcome. 895% ``` 896 897unicast_out_of_scope_request(Scope, From, send(Term)) :- 898 udp_unicast_join_hook(Scope, From, Term). 899unicast_out_of_scope_request(Scope, From, request(Key, Term)) :- 900 udp_unicast_join_hook(Scope, From, Term), 901 udp_public_socket(Scope, _Port, Socket, _FileNo), 902 safely((udp_term_string(Scope, reply(Key,Term), Message), 903 udp_send(Socket, Message, From, [])))