1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald and Jan Wielemaker 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2012-2013, Jeffrey Rosenwald 7 2018, CWI Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(udp_broadcast, 37 [ udp_broadcast_initialize/2, % +IPAddress, +Options 38 udp_broadcast_close/1, % +Scope 39 40 udp_peer_add/2, % +Scope, +IP 41 udp_peer_del/2, % +Scope, ?IP 42 udp_peer/2 % +Scope, -IP 43 ]). 44:- use_module(library(socket)). 45:- use_module(library(broadcast)). 46:- use_module(library(option)). 47:- use_module(library(apply)). 48:- use_module(library(debug)). 49:- use_module(library(error)). 50 51% :- debug(udp(broadcast)).
267:- multifile 268 udp_term_string_hook/3, % +Scope, ?Term, ?String 269 udp_unicast_join_hook/3, % +Scope, +From, +Data 270 black_list/1. % +Term 271 272:- meta_predicate safely( ). 273 274safely(Predicate) :- 275 catch(Predicate, Err, 276 ( Err == '$aborted' 277 -> !, fail 278 ; print_message(error, Err), fail 279 )). 280 281udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :- 282 IPAddress = ip(A1, A2, A3, A4), 283 Subnet = ip(S1, S2, S3, S4), 284 BroadcastAddress = ip(B1, B2, B3, B4), 285 286 B1 is A1 \/ (S1 xor 255), 287 B2 is A2 \/ (S2 xor 255), 288 B3 is A3 \/ (S3 xor 255), 289 B4 is A4 \/ (S4 xor 255).
udp_subnet
.298:- dynamic 299 udp_scope/2, 300 udp_scope_peer/2. 301:- volatile 302 udp_scope/2, 303 udp_scope_peer/2. 304% 305% Here's a UDP proxy to Prolog's broadcast library 306% 307% A sender may extend a broadcast to a subnet of a UDP network by 308% specifying a =|udp_subnet|= scoping qualifier in his/her broadcast. 309% The qualifier has the effect of selecting the appropriate multi-cast 310% address for the transmission. Thus, the sender of the message has 311% control over the scope of his/her traffic on a per-message basis. 312% 313% All in-scope listeners receive the broadcast and simply rebroadcast 314% the message locally. All broadcast replies, if any, are sent directly 315% to the sender via the port-id that was received with the broadcast. 316% 317% Each listener exposes two UDP ports, a shared public port that is 318% bound to a well-known port number and a private port that uniquely 319% indentifies the listener. Broadcasts are received on the public port 320% and replies are sent on the private port. Directed broadcasts 321% (unicasts) are received on the private port and replies are sent on 322% the private port. 323 324% Thread 1 listens for directed traffic on the private port. 325% 326 327:- dynamic 328 udp_private_socket/3, % Port, Socket, FileNo 329 udp_public_socket/4, % Scope, Port, Socket, FileNo 330 udp_closed/1. % Scope 331 332udp_inbound_proxy :- 333 make_private_socket, 334 forall(udp_scope(Scope, ScopeData), 335 make_public_socket(ScopeData, Scope)), 336 retractall(udp_closed(_)), 337 findall(FileNo, udp_socket_file_no(FileNo), FileNos), 338 catch(dispatch_inbound(FileNos), 339 E, dispatch_exception(E)), 340 udp_inbound_proxy. 341 342dispatch_exception(E) :- 343 E = error(_,_), 344 !, 345 print_message(warning, E). 346dispatch_exception(_).
356make_private_socket :- 357 udp_private_socket(_Port, S, _F), 358 !, 359 ( ( udp_scope(Scope, broadcast(_,_,_)) 360 ; udp_scope(Scope, multicast(_,_)) 361 ), 362 \+ udp_closed(Scope) 363 -> true 364 ; tcp_close_socket(S), 365 retractall(udp_private_socket(_,_,_)) 366 ). 367make_private_socket :- 368 udp_scope(_, broadcast(_,_,_)), 369 !, 370 udp_socket(S), 371 tcp_bind(S, Port), 372 tcp_getopt(S, file_no(F)), 373 tcp_setopt(S, broadcast), 374 assertz(udp_private_socket(Port, S, F)). 375make_private_socket :- 376 udp_scope(_, multicast(_,_)), 377 !, 378 udp_socket(S), 379 tcp_bind(S, Port), 380 tcp_getopt(S, file_no(F)), 381 assertz(udp_private_socket(Port, S, F)). 382make_private_socket.
388make_public_socket(_, Scope) :- 389 udp_public_socket(Scope, _Port, S, _), 390 !, 391 ( udp_closed(Scope) 392 -> tcp_close_socket(S), 393 retractall(udp_public_socket(Scope, _, _, _)) 394 ; true 395 ). 396make_public_socket(broadcast(_SubNet, _Broadcast, Port), Scope) :- 397 udp_socket(S), 398 tcp_setopt(S, reuseaddr), 399 tcp_bind(S, Port), 400 tcp_getopt(S, file_no(F)), 401 assertz(udp_public_socket(Scope, Port, S, F)). 402make_public_socket(multicast(Group, Port), Scope) :- 403 udp_socket(S), 404 tcp_setopt(S, reuseaddr), 405 tcp_bind(S, Port), 406 tcp_setopt(S, ip_add_membership(Group)), 407 tcp_getopt(S, file_no(F)), 408 assertz(udp_public_socket(Scope, Port, S, F)). 409make_public_socket(unicast(Port), Scope) :- 410 udp_socket(S), 411 tcp_bind(S, Port), 412 tcp_getopt(S, file_no(F)), 413 assertz(udp_public_socket(Scope, Port, S, F)). 414 415udp_socket_file_no(FileNo) :- 416 udp_private_socket(_,_,FileNo). 417udp_socket_file_no(FileNo) :- 418 udp_public_socket(_,_,_,FileNo).
428dispatch_inbound(FileNos) :- 429 debug(udp(broadcast), 'Waiting for ~p', [FileNos]), 430 wait_for_input(FileNos, Ready, infinite), 431 debug(udp(broadcast), 'Ready: ~p', [Ready]), 432 maplist(dispatch_ready, Ready), 433 dispatch_inbound(FileNos). 434 435dispatch_ready(FileNo) :- 436 udp_private_socket(_Port, Private, FileNo), 437 !, 438 udp_receive(Private, Data, From, [max_message_size(65535)]), 439 debug(udp(broadcast), 'Inbound on private port', []), 440 ( in_scope(Scope, From), 441 udp_term_string(Scope, Term, Data) % only accept valid data 442 -> ld_dispatch(Private, Term, From, Scope) 443 ; true 444 ). 445dispatch_ready(FileNo) :- 446 udp_public_socket(Scope, _PublicPort, Public, FileNo), 447 !, 448 udp_receive(Public, Data, From, [max_message_size(65535)]), 449 debug(udp(broadcast), 'Inbound on public port from ~p for scope ~p', 450 [From, Scope]), 451 ( in_scope(Scope, From), 452 udp_term_string(Scope, Term, Data) % only accept valid data 453 -> ( udp_scope(Scope, unicast(_)) 454 -> ld_dispatch(Public, Term, From, Scope) 455 ; udp_private_socket(_PrivatePort, Private, _FileNo), 456 ld_dispatch(Private, Term, From, Scope) 457 ) 458 ; udp_scope(Scope, unicast(_)), 459 udp_term_string(Scope, Term, Data), 460 unicast_out_of_scope_request(Scope, From, Term) 461 -> true 462 ; true 463 ). 464 465in_scope(Scope, Address) :- 466 udp_scope(Scope, ScopeData), 467 in_scope(ScopeData, Scope, Address), 468 !. 469in_scope(Scope, From) :- 470 debug(udp(broadcast), 'Out-of-scope ~p datagram from ~p', 471 [Scope, From]), 472 fail. 473 474in_scope(broadcast(Subnet, Broadcast, _PublicPort), _Scope, IP:_FromPort) :- 475 udp_broadcast_address(IP, Subnet, Broadcast). 476in_scope(multicast(_Group, _Port), _Scope, _From). 477in_scope(unicast(_PublicPort), Scope, IP:_) :- 478 udp_peer(Scope, IP:_).
487ld_dispatch(_S, Term, From, _Scope) :- 488 debug(udp(broadcast), 'ld_dispatch(~p) from ~p', [Term, From]), 489 fail. 490ld_dispatch(_S, Term, _From, _Scope) :- 491 blacklisted(Term), !. 492ld_dispatch(S, request(Key, Term), From, Scope) :- 493 !, 494 forall(safely(broadcast_request(Term)), 495 safely((udp_term_string(Scope, reply(Key,Term), Message), 496 udp_send(S, Message, From, [])))). 497ld_dispatch(_S, send(Term), _From, _Scope) :- 498 safely(broadcast(Term)). 499ld_dispatch(_S, reply(Key, Term), From, _Scope) :- 500 ( reply_queue(Key, Queue) 501 -> safely(thread_send_message(Queue, Term:From)) 502 ; true 503 ). 504 505blacklisted(send(Term)) :- black_list(Term). 506blacklisted(request(_,Term)) :- black_list(Term). 507blacklisted(reply(_,Term)) :- black_list(Term).
523reload_udp_proxy :- 524 reload_outbound_proxy, 525 reload_inbound_proxy. 526 527reload_outbound_proxy :- 528 listening(udp_broadcast, udp(_,_), _), 529 !. 530reload_outbound_proxy :- 531 listen(udp_broadcast, udp(Scope,Message), 532 udp_broadcast(Message, Scope, 0.25)), 533 listen(udp_broadcast, udp(Scope,Message,Timeout), 534 udp_broadcast(Message, Scope, Timeout)), 535 listen(udp_broadcast, udp_subnet(Message), % backward compatibility 536 udp_broadcast(Message, subnet, 0.25)), 537 listen(udp_broadcast, udp_subnet(Message,Timeout), 538 udp_broadcast(Message, subnet, Timeout)). 539 540reload_inbound_proxy :- 541 catch(thread_signal(udp_inbound_proxy, throw(udp_reload)), 542 error(existence_error(thread, _),_), 543 fail), 544 !. 545reload_inbound_proxy :- 546 thread_create(udp_inbound_proxy, _, 547 [ alias(udp_inbound_proxy), 548 detached(true) 549 ]).
555udp_broadcast_close(Scope) :- 556 udp_scope(Scope, _ScopeData), 557 !, 558 assert(udp_closed(Scope)), 559 reload_udp_proxy. 560udp_broadcast_close(_).
Term:Address
to send Term to a specific address or query
the address from which term is answered or it is a plain Term.
If Term is nonground, it is considered is a request (see broadcast_request/1) and the predicate succeeds for each answer received within TimeOut seconds. If Term is ground it is considered an asynchronous broadcast and udp_broadcast/3 is deterministic.
574udp_broadcast(Term:To, Scope, _Timeout) :- 575 ground(Term), ground(To), % broadcast to single listener 576 !, 577 udp_basic_broadcast(send(Term), Scope, single(To)). 578udp_broadcast(Term, Scope, _Timeout) :- 579 ground(Term), % broadcast to all listeners 580 !, 581 udp_basic_broadcast(send(Term), Scope, broadcast). 582udp_broadcast(Term:To, Scope, Timeout) :- 583 ground(To), % request to single listener 584 !, 585 setup_call_cleanup( 586 request_queue(Id, Queue), 587 ( udp_basic_broadcast(request(Id, Term), Scope, single(To)), 588 udp_br_collect_replies(Queue, Timeout, Term:To) 589 ), 590 destroy_request_queue(Queue)). 591udp_broadcast(Term:From, Scope, Timeout) :- 592 !, % request to all listeners, collect sender 593 setup_call_cleanup( 594 request_queue(Id, Queue), 595 ( udp_basic_broadcast(request(Id, Term), Scope, broadcast), 596 udp_br_collect_replies(Queue, Timeout, Term:From) 597 ), 598 destroy_request_queue(Queue)). 599udp_broadcast(Term, Scope, Timeout) :- % request to all listeners 600 udp_broadcast(Term:_, Scope, Timeout). 601 602:- dynamic 603 reply_queue/2. 604 605request_queue(Id, Queue) :- 606 Id is random(1<<63), 607 message_queue_create(Queue), 608 asserta(reply_queue(Id, Queue)). 609 610destroy_request_queue(Queue) :- % leave queue to GC 611 retractall(reply_queue(_, Queue)).
This predicate succeeds with a choice point. Committing the choice point closes S.
624udp_basic_broadcast(Term, Scope, Dest) :- 625 debug(udp(broadcast), 'UDP proxy outbound ~p to ~p', [Term, Dest]), 626 udp_term_string(Scope, Term, String), 627 udp_send_message(Dest, String, Scope). 628 629udp_send_message(single(Address), String, Scope) :- 630 ( udp_scope(Scope, unicast(_)) 631 -> udp_public_socket(Scope, _Port, S, _) 632 ; udp_private_socket(_Port, S, _F) 633 ), 634 safely(udp_send(S, String, Address, [])). 635udp_send_message(broadcast, String, Scope) :- 636 ( udp_scope(Scope, unicast(_)) 637 -> udp_public_socket(Scope, _Port, S, _), 638 forall(udp_peer(Scope, Address), 639 ( debug(udp(broadcast), 'Unicast to ~p', [Address]), 640 safely(udp_send(S, String, Address, [])))) 641 ; udp_scope(Scope, broadcast(_SubNet, Broadcast, Port)) 642 -> udp_private_socket(_PrivatePort, S, _F), 643 udp_send(S, String, Broadcast:Port, []) 644 ; udp_scope(Scope, multicast(Group, Port)) 645 -> udp_private_socket(_PrivatePort, S, _F), 646 udp_send(S, String, Group:Port, []) 647 ). 648 649% ! udp_br_collect_replies(+Queue, +TimeOut, -TermAndFrom) is nondet. 650% 651% Collect replies on Socket for TimeOut seconds. Succeed for each 652% received message. 653 654udp_br_collect_replies(Queue, Timeout, Reply) :- 655 get_time(Start), 656 Deadline is Start+Timeout, 657 repeat, 658 ( thread_get_message(Queue, Reply, 659 [ deadline(Deadline) 660 ]) 661 -> true 662 ; !, 663 fail 664 ).
ip(A,B,C,D)
or an atom or string of the format A.B.C.D
. Options processed:
subnet
.For compatibility reasons Options may be the subnet mask.
693udp_broadcast_initialize(IP, Options) :- 694 with_mutex(udp_broadcast, 695 udp_broadcast_initialize_sync(IP, Options)). 696 697udp_broadcast_initialize_sync(IP, Options) :- 698 nonvar(Options), 699 Options = ip(_,_,_,_), 700 !, 701 udp_broadcast_initialize(IP, [subnet_mask(Options)]). 702udp_broadcast_initialize_sync(IP, Options) :- 703 to_ip4(IP, IPAddress), 704 option(method(Method), Options, broadcast), 705 must_be(oneof([broadcast, multicast, unicast]), Method), 706 udp_broadcast_initialize_sync(Method, IPAddress, Options), 707 reload_udp_proxy. 708 709udp_broadcast_initialize_sync(broadcast, IPAddress, Options) :- 710 option(subnet_mask(Subnet), Options, _), 711 mk_subnet(Subnet, IPAddress, Subnet4), 712 option(port(Port), Options, 20005), 713 option(scope(Scope), Options, subnet), 714 715 udp_broadcast_address(IPAddress, Subnet4, Broadcast), 716 udp_broadcast_close(Scope), 717 assertz(udp_scope(Scope, broadcast(Subnet4, Broadcast, Port))). 718udp_broadcast_initialize_sync(unicast, _IPAddress, Options) :- 719 option(port(Port), Options, 20005), 720 option(scope(Scope), Options, subnet), 721 udp_broadcast_close(Scope), 722 assertz(udp_scope(Scope, unicast(Port))). 723udp_broadcast_initialize_sync(multicast, IPAddress, Options) :- 724 option(port(Port), Options, 20005), 725 option(scope(Scope), Options, subnet), 726 udp_broadcast_close(Scope), 727 multicast_address(IPAddress), 728 assertz(udp_scope(Scope, multicast(IPAddress, Port))). 729 730to_ip4(Atomic, ip(A,B,C,D)) :- 731 atomic(Atomic), 732 !, 733 ( split_string(Atomic, ".", "", Strings), 734 maplist(number_string, [A,B,C,D], Strings) 735 -> true 736 ; syntax_error(illegal_ip_address) 737 ). 738to_ip4(IP, IP). 739 740mk_subnet(Var, IP, Subnet) :- 741 var(Var), 742 !, 743 ( default_subnet(IP, Subnet) 744 -> true 745 ; domain_error(ip_with_subnet, IP) 746 ). 747mk_subnet(Subnet, _, Subnet4) :- 748 to_ip4(Subnet, Subnet4). 749 750default_subnet(ip(A,_,_,_), ip(A,0,0,0)) :- 751 between(1,126, A), !. 752default_subnet(ip(A,B,_,_), ip(A,B,0,0)) :- 753 between(128,191, A), !. 754default_subnet(ip(A,B,C,_), ip(A,B,C,0)) :- 755 between(192,223, A), !. 756 757multicast_address(ip(A,_,_,_)) :- 758 between(224, 239, A), 759 !. 760multicast_address(IP) :- 761 domain_error(multicast_network, IP). 762 763 764 /******************************* 765 * UNICAST PEERS * 766 *******************************/
778udp_peer_add(Scope, Address) :- 779 must_be(ground, Address), 780 peer_address(Address, Scope, Canonical), 781 ( udp_scope_peer(Scope, Canonical) 782 -> true 783 ; assertz(udp_scope_peer(Scope, Canonical)) 784 ). 785 786udp_peer_del(Scope, Address) :- 787 peer_address(Address, Scope, Canonical), 788 retractall(udp_scope_peer(Scope, Canonical)). 789 790udp_peer(Scope, IPAddress) :- 791 udp_scope_peer(Scope, IPAddress). 792 793peer_address(IP:Port, _Scope, IPAddress:Port) :- 794 !, 795 to_ip4(IP, IPAddress). 796peer_address(IP, Scope, IPAddress:Port) :- 797 ( udp_scope(Scope, unicast(Port)) 798 -> true 799 ; existence_error(udp_scope, Scope) 800 ), 801 to_ip4(IP, IPAddress). 802 803 804 805 /******************************* 806 * HOOKS * 807 *******************************/
%prolog\n
, followed by the Prolog term in quoted notation while
ignoring operators. This hook may use alternative serialization such
as fast_term_serialized/2, use library(ssl)
to realise encrypted
messages, etc.
In mode (+,-), Term is written with the options ignore_ops(true)
and
quoted(true)
.
This predicate first calls udp_term_string_hook/3.
844udp_term_string(Scope, Term, String) :- 845 udp_term_string_hook(Scope, Term, String), 846 !. 847udp_term_string(_Scope, Term, String) :- 848 ( var(String) 849 -> format(string(String), '%-prolog-\n~W', 850 [ Term, 851 [ ignore_ops(true), 852 quoted(true) 853 ] 854 ]) 855 ; sub_string(String, 0, _, _, '%-prolog-\n'), 856 term_string(Term, String, 857 [ syntax_errors(quiet) 858 ]) 859 ).
This hook is intended to initiate a new node joining the network of
peers. We could in theory also omit the in-scope test and use a
normal broadcast to join. Using a different channal however provides
a basic level of security. A possibe implementation is below. The
first fragment is a hook added to the server, the second is a
predicate added to a client and the last initiates the request in
the client. The excanged term (join(X)
) can be used to exchange a
welcome handshake.
:- multifile udp_broadcast:udp_unicast_join_hook/3. udp_broadcast:udp_unicast_join_hook(Scope, From, join(welcome)) :- udp_peer_add(Scope, From),
join_request(Scope, Address, Reply) :- udp_peer_add(Scope, Address), broadcast_request(udp(Scope, join(X))).
?- join_request(myscope, "1.2.3.4":10001, Reply). Reply = welcome.
897unicast_out_of_scope_request(Scope, From, send(Term)) :- 898 udp_unicast_join_hook(Scope, From, Term). 899unicast_out_of_scope_request(Scope, From, request(Key, Term)) :- 900 udp_unicast_join_hook(Scope, From, Term), 901 udp_public_socket(Scope, _Port, Socket, _FileNo), 902 safely((udp_term_string(Scope, reply(Key,Term), Message), 903 udp_send(Socket, Message, From, [])))
A UDP broadcast proxy
SWI-Prolog's broadcast library provides a means that may be used to facilitate publish and subscribe communication regimes between anonymous members of a community of interest. The members of the community are however, necessarily limited to a single instance of Prolog. The UDP broadcast library removes that restriction. With this library loaded, any member on your local IP subnetwork that also has this library loaded may hear and respond to your broadcasts.
This library support three styles of networking as described below. Each of these networks have their own advantages and disadvantages. Please study the literature to understand the consequences.
After initialization and, in the case of a unicast network managing the set of peers, communication happens through broadcast/1, broadcast_request/1 and listen/1,2,3.
A broadcast/1 or broadcast_request/1 of the shape
udp(Scope, Term)
orudp(Scope, Term, TimeOut)
is forwarded over the UDP network to all peers that joined the same Scope. To prevent the potential for feedback loops, only the plain Term is broadcasted locally. The timeout is optional. It specifies the amount to time to wait for replies to arrive in response to a broadcast_request/1. The default period is 0.250 seconds. The timeout is ignored for broadcasts.An example of three separate processes cooperating in the same scope called
peers
:It is also possible to carry on a private dialog with a single responder. To do this, you supply a compound of the form, Term:PortId, to a UDP scoped broadcast/1 or broadcast_request/1, where PortId is the ip-address and port-id of the intended listener. If you supply an unbound variable, PortId, to broadcast_request, it will be unified with the address of the listener that responds to Term. You may send a directed broadcast to a specific member by simply providing this address in a similarly structured compound to a UDP scoped broadcast/1. The message is sent via unicast to that member only by way of the member's broadcast listener. It is received by the listener just as any other broadcast would be. The listener does not know the difference.
For example, in order to discover who responded with a particular value:
All incomming trafic is handled by a single thread with the alias
udp_inbound_proxy
. This thread also performs the internal dispatching using broadcast/1 and broadcast_request/1. Future versions may provide for handling these requests in seperate threads.Caveats
While the implementation is mostly transparent, there are some important and subtle differences that must be taken into consideration:
udp_subnet
scope is not reentrant. If a listener performs a broadcast_request/1 with UDP scope recursively, then disaster looms certain. This caveat does not apply to a UDP scoped broadcast/1, which can safely be performed from a listener context.tipc.pl
*/