View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald and Jan Wielemaker
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2012-2013, Jeffrey Rosenwald
    7		   2018, CWI Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(udp_broadcast,
   37          [ udp_broadcast_initialize/2,         % +IPAddress, +Options
   38            udp_broadcast_close/1,		% +Scope
   39
   40            udp_peer_add/2,                     % +Scope, +IP
   41            udp_peer_del/2,                     % +Scope, ?IP
   42            udp_peer/2                          % +Scope, -IP
   43          ]).   44:- use_module(library(socket)).   45:- use_module(library(broadcast)).   46:- use_module(library(option)).   47:- use_module(library(apply)).   48:- use_module(library(debug)).   49:- use_module(library(error)).   50
   51% :- debug(udp(broadcast)).
   52
   53/** <module> A UDP broadcast proxy
   54
   55SWI-Prolog's broadcast library provides a  means   that  may  be used to
   56facilitate publish and subscribe communication regimes between anonymous
   57members of a community of interest.  The   members  of the community are
   58however, necessarily limited to a  single   instance  of Prolog. The UDP
   59broadcast library removes that restriction.   With  this library loaded,
   60any member on your local IP subnetwork that also has this library loaded
   61may hear and respond to your broadcasts.
   62
   63This library support three styles of networking as described below. Each
   64of these networks have their own   advantages  and disadvantages. Please
   65study the literature to understand the consequences.
   66
   67  $ broadcast :
   68  Broadcast messages are sent to the LAN subnet. The broadcast
   69  implementation uses two UDP ports: a public to address the whole
   70  group and a private one to address a specific node.  Broadcasting
   71  is generally a good choice if the subnet is small and traffic is
   72  low.
   73
   74  $ unicast :
   75  Unicast sends copies of packages to known peers.  Unicast networks
   76  can easily be routed.  The unicast version uses a single UDP port
   77  per node.  Unicast is generally a good choice for a small party,
   78  in particular if the peers are in different networks.
   79
   80  $ multicast :
   81  Multicast is like broadcast, but it can be configured to
   82  work accross networks and may work more efficiently on VLAN networks.
   83  Like the broadcast setup, two UDP ports are used.  Multicasting can
   84  in general deliver the most efficient LAN and WAN networks, but
   85  requires properly configured routing between the peers.
   86
   87After initialization and, in the case   of  a _unicast_ network managing
   88the  set  of  peers,   communication    happens   through   broadcast/1,
   89broadcast_request/1 and listen/1,2,3.
   90
   91A broadcast/1 or broadcast_request/1 of the   shape  udp(Scope, Term) or
   92udp(Scope, Term, TimeOut) is forwarded over the UDP network to all peers
   93that joined the same `Scope`.  To   prevent  the  potential for feedback
   94loops, only the plain `Term`  is   broadcasted  locally.  The timeout is
   95optional. It specifies the amount to time  to wait for replies to arrive
   96in response to a  broadcast_request/1.  The   default  period  is  0.250
   97seconds. The timeout is ignored for broadcasts.
   98
   99An example of three separate processes   cooperating in the same _scope_
  100called `peers`:
  101
  102==
  103Process A:
  104
  105   ?- listen(number(X), between(1, 5, X)).
  106   true.
  107
  108   ?-
  109
  110Process B:
  111
  112   ?- listen(number(X), between(7, 9, X)).
  113   true.
  114
  115   ?-
  116
  117Process C:
  118
  119   ?- findall(X, broadcast_request(udp(peers, number(X))), Xs).
  120   Xs = [1, 2, 3, 4, 5, 7, 8, 9].
  121
  122   ?-
  123==
  124
  125It is also  possible  to  carry  on   a  private  dialog  with  a single
  126responder. To do this, you supply a   compound of the form, Term:PortId,
  127to a UDP scoped broadcast/1 or  broadcast_request/1, where PortId is the
  128ip-address and port-id of  the  intended   listener.  If  you  supply an
  129unbound variable, PortId, to broadcast_request, it  will be unified with
  130the address of the listener  that  responds   to  Term.  You  may send a
  131directed broadcast to a specific member by simply providing this address
  132in a similarly structured compound  to   a  UDP  scoped broadcast/1. The
  133message is sent via unicast to that member   only by way of the member's
  134broadcast listener. It is received by  the   listener  just as any other
  135broadcast would be. The listener does not know the difference.
  136
  137For example, in order to discover who responded with a particular value:
  138
  139==
  140Host B Process 1:
  141
  142   ?- listen(number(X), between(1, 5, X)).
  143   true.
  144
  145   ?-
  146
  147Host A Process 1:
  148
  149
  150   ?- listen(number(X), between(7, 9, X)).
  151   true.
  152
  153   ?-
  154
  155Host A Process 2:
  156
  157   ?- listen(number(X), between(1, 5, X)).
  158   true.
  159
  160   ?- bagof(X, broadcast_request(udp(peers,number(X):From,1)), Xs).
  161   From = ip(192, 168, 1, 103):34855,
  162   Xs = [7, 8, 9] ;
  163   From = ip(192, 168, 1, 103):56331,
  164   Xs = [1, 2, 3, 4, 5] ;
  165   From = ip(192, 168, 1, 104):3217,
  166   Xs = [1, 2, 3, 4, 5].
  167==
  168
  169All incomming trafic is handled  by  a   single  thread  with  the alias
  170`udp_inbound_proxy`. This thread also performs  the internal dispatching
  171using broadcast/1 and broadcast_request/1. Future   versions may provide
  172for handling these requests in seperate threads.
  173
  174
  175## Caveats {#udp-broadcase-caveats}
  176
  177While the implementation is mostly transparent, there are some important
  178and subtle differences that must be taken into consideration:
  179
  180    * UDP broadcast requires an initialization step in order to
  181    launch the broadcast listener proxy. See
  182    udp_broadcast_initialize/2.
  183
  184    * Prolog's broadcast_request/1 is nondet. It sends the request,
  185    then evaluates the replies synchronously, backtracking as needed
  186    until a satisfactory reply is received. The remaining potential
  187    replies are not evaluated.  With UDP, all peers will send all
  188    answers to the query.  The receiver may however stop listening.
  189
  190    * A UDP broadcast/1 is completely asynchronous.
  191
  192    * A  UDP broadcast_request/1 is partially synchronous. A
  193    broadcast_request/1 is sent, then the sender balks for a period of
  194    time (default: 250 ms) while the replies are collected. Any reply
  195    that is received after this period is silently discarded. A
  196    optional second argument is provided so that a sender may specify
  197    more (or less) time for replies.
  198
  199    * Replies are presented to the user as a choice point on arrival,
  200    until the broadcast request timer finally expires. This
  201    allows traffic to propagate through the system faster and provides
  202    the requestor with the opportunity to terminate a broadcast request
  203    early if desired, by simply cutting choice points.
  204
  205    * Please beware that broadcast request transactions remain active
  206    and resources consumed until broadcast_request finally fails on
  207    backtracking, an uncaught exception occurs, or until choice points
  208    are cut. Failure to properly manage this will likely result in
  209    chronic exhaustion of UDP sockets.
  210
  211    * If a listener is connected to a generator that always succeeds
  212    (e.g. a random number generator), then the broadcast request will
  213    never terminate and trouble is bound to ensue.
  214
  215    * broadcast_request/1 with =|udp_subnet|= scope is _not_ reentrant.
  216    If a listener performs a broadcast_request/1 with UDP scope
  217    recursively, then disaster looms certain. This caveat does not apply
  218    to a UDP scoped broadcast/1, which can safely be performed from a
  219    listener context.
  220
  221    * UDP broadcast's capacity is not infinite. While it can tolerate
  222    substantial bursts of activity, it is designed for short bursts of
  223    small messages. Unlike TIPC, UDP is unreliable and has no QOS
  224    protections. Congestion is likely to cause trouble in the form of
  225    non-Byzantine failure. That is, late, lost (e.g. infinitely late),
  226    or duplicate datagrams. Caveat emptor.
  227
  228    * A UDP broadcast_request/1 term that is grounded is considered to
  229    be a broadcast only. No replies are collected unless the there is at
  230    least one unbound variable to unify.
  231
  232    * A UDP broadcast/1 always succeeds, even if there are no
  233    listeners.
  234
  235    * A UDP broadcast_request/1 that receives no replies will fail.
  236
  237    * Replies may be coming from many different places in the network
  238    (or none at all). No ordering of replies is implied.
  239
  240    * Prolog terms are sent to others after first converting them to
  241    atoms using term_string/3.  Serialization does not deal with cycles,
  242    attributes or sharing.   The hook udp_term_string_hook/3 may be
  243    defined to change the message serialization and support different
  244    message formats and/or encryption.
  245
  246    * The broadcast model is based on anonymity and a presumption of
  247    trust--a perfect recipe for compromise. UDP is an Internet protocol.
  248    A UDP broadcast listener exposes a public port, which is
  249    static and shared by all listeners, and a private port, which is
  250    semi-static and unique to the listener instance. Both can be seen
  251    from off-cluster nodes and networks. Usage of this module exposes
  252    the node and consequently, the cluster to significant security
  253    risks. So have a care when designing your application. You must talk
  254    only to those who share and contribute to your concerns using a
  255    carefully prescribed protocol.
  256
  257    * UDP broadcast categorically and silently ignores all message
  258    traffic originating from or terminating on nodes that are not
  259    members of the local subnet. This security measure only keeps honest
  260    people honest!
  261
  262@author    Jeffrey Rosenwald (JeffRose@acm.org), Jan Wielemaker
  263@license   BSD-2
  264@see       tipc.pl
  265*/
  266
  267:- multifile
  268    udp_term_string_hook/3,                     % +Scope, ?Term, ?String
  269    udp_unicast_join_hook/3,                    % +Scope, +From, +Data
  270    black_list/1.                               % +Term
  271
  272:- meta_predicate safely(0).  273
  274safely(Predicate) :-
  275    catch(Predicate, Err,
  276          (   Err == '$aborted'
  277          ->  !, fail
  278          ;   print_message(error, Err), fail
  279          )).
  280
  281udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :-
  282    IPAddress = ip(A1, A2, A3, A4),
  283    Subnet = ip(S1, S2, S3, S4),
  284    BroadcastAddress = ip(B1, B2, B3, B4),
  285
  286    B1 is A1 \/ (S1 xor 255),
  287    B2 is A2 \/ (S2 xor 255),
  288    B3 is A3 \/ (S3 xor 255),
  289    B4 is A4 \/ (S4 xor 255).
  290
  291%!  udp_broadcast_service(?Scope, ?Address) is nondet.
  292%
  293%   provides the UDP broadcast address for   a  given Scope. At present,
  294%   only one scope is supported, =|udp_subnet|=.
  295
  296%!  udp_scope(?ScopeName, ?ScopeDef)
  297
  298:- dynamic
  299    udp_scope/2,
  300    udp_scope_peer/2.  301:- volatile
  302    udp_scope/2,
  303    udp_scope_peer/2.  304%
  305%  Here's a UDP proxy to Prolog's broadcast library
  306%
  307%  A sender may extend a broadcast  to  a   subnet  of  a UDP network by
  308%  specifying a =|udp_subnet|= scoping qualifier   in his/her broadcast.
  309%  The qualifier has the effect of  selecting the appropriate multi-cast
  310%  address for the transmission. Thus,  the   sender  of the message has
  311%  control over the scope of his/her traffic on a per-message basis.
  312%
  313%  All in-scope listeners receive the   broadcast and simply rebroadcast
  314%  the message locally. All broadcast replies, if any, are sent directly
  315%  to the sender via the port-id that   was received with the broadcast.
  316%
  317%  Each listener exposes two UDP ports,  a   shared  public port that is
  318%  bound to a well-known port number and   a  private port that uniquely
  319%  indentifies the listener. Broadcasts are received  on the public port
  320%  and replies are  sent  on  the   private  port.  Directed  broadcasts
  321%  (unicasts) are received on the private port   and replies are sent on
  322%  the private port.
  323
  324%  Thread 1 listens for directed traffic on the private port.
  325%
  326
  327:- dynamic
  328    udp_private_socket/3,                       % Port, Socket, FileNo
  329    udp_public_socket/4,                        % Scope, Port, Socket, FileNo
  330    udp_closed/1.				% Scope
  331
  332udp_inbound_proxy :-
  333    make_private_socket,
  334    forall(udp_scope(Scope, ScopeData),
  335           make_public_socket(ScopeData, Scope)),
  336    retractall(udp_closed(_)),
  337    findall(FileNo, udp_socket_file_no(FileNo), FileNos),
  338    catch(dispatch_inbound(FileNos),
  339          E, dispatch_exception(E)),
  340    udp_inbound_proxy.
  341
  342dispatch_exception(E) :-
  343    E = error(_,_),
  344    !,
  345    print_message(warning, E).
  346dispatch_exception(_).
  347
  348
  349%!  make_private_socket is det.
  350%
  351%   Create our private socket. This socket is used for messages that are
  352%   directed to me. Note that we only  need this for broadcast networks.
  353%   If we use a unicast network we use   our public port to contact this
  354%   specific server.
  355
  356make_private_socket :-
  357    udp_private_socket(_Port, S, _F),
  358    !,
  359    (   (   udp_scope(Scope, broadcast(_,_,_))
  360        ;   udp_scope(Scope, multicast(_,_))
  361        ),
  362        \+ udp_closed(Scope)
  363    ->  true
  364    ;   tcp_close_socket(S),
  365        retractall(udp_private_socket(_,_,_))
  366    ).
  367make_private_socket :-
  368    udp_scope(_, broadcast(_,_,_)),
  369    !,
  370    udp_socket(S),
  371    tcp_bind(S, Port),
  372    tcp_getopt(S, file_no(F)),
  373    tcp_setopt(S, broadcast),
  374    assertz(udp_private_socket(Port, S, F)).
  375make_private_socket :-
  376    udp_scope(_, multicast(_,_)),
  377    !,
  378    udp_socket(S),
  379    tcp_bind(S, Port),
  380    tcp_getopt(S, file_no(F)),
  381    assertz(udp_private_socket(Port, S, F)).
  382make_private_socket.
  383
  384%!  make_public_socket(+ScopeData, +Scope)
  385%
  386%   Create the public port Scope.
  387
  388make_public_socket(_, Scope) :-
  389    udp_public_socket(Scope, _Port, S, _),
  390    !,
  391    (   udp_closed(Scope)
  392    ->  tcp_close_socket(S),
  393        retractall(udp_public_socket(Scope, _, _, _))
  394    ;   true
  395    ).
  396make_public_socket(broadcast(_SubNet, _Broadcast, Port), Scope) :-
  397    udp_socket(S),
  398    tcp_setopt(S, reuseaddr),
  399    tcp_bind(S, Port),
  400    tcp_getopt(S, file_no(F)),
  401    assertz(udp_public_socket(Scope, Port, S, F)).
  402make_public_socket(multicast(Group, Port), Scope) :-
  403    udp_socket(S),
  404    tcp_setopt(S, reuseaddr),
  405    tcp_bind(S, Port),
  406    tcp_setopt(S, ip_add_membership(Group)),
  407    tcp_getopt(S, file_no(F)),
  408    assertz(udp_public_socket(Scope, Port, S, F)).
  409make_public_socket(unicast(Port), Scope) :-
  410    udp_socket(S),
  411    tcp_bind(S, Port),
  412    tcp_getopt(S, file_no(F)),
  413    assertz(udp_public_socket(Scope, Port, S, F)).
  414
  415udp_socket_file_no(FileNo) :-
  416    udp_private_socket(_,_,FileNo).
  417udp_socket_file_no(FileNo) :-
  418    udp_public_socket(_,_,_,FileNo).
  419
  420%!  dispatch_inbound(+FileNos)
  421%
  422%   Dispatch inbound traffic. This loop   uses  wait_for_input/3 to wait
  423%   for one or more UDP sockets and   dispatches  the requests using the
  424%   internal broadcast service. For an  incomming broadcast _request_ we
  425%   send the reply only to the  requester   and  therefore we must use a
  426%   socket that is not in broadcast mode.
  427
  428dispatch_inbound(FileNos) :-
  429    debug(udp(broadcast), 'Waiting for ~p', [FileNos]),
  430    wait_for_input(FileNos, Ready, infinite),
  431    debug(udp(broadcast), 'Ready: ~p', [Ready]),
  432    maplist(dispatch_ready, Ready),
  433    dispatch_inbound(FileNos).
  434
  435dispatch_ready(FileNo) :-
  436    udp_private_socket(_Port, Private, FileNo),
  437    !,
  438    udp_receive(Private, Data, From, [max_message_size(65535)]),
  439    debug(udp(broadcast), 'Inbound on private port', []),
  440    (   in_scope(Scope, From),
  441        udp_term_string(Scope, Term, Data) % only accept valid data
  442    ->  ld_dispatch(Private, Term, From, Scope)
  443    ;   true
  444    ).
  445dispatch_ready(FileNo) :-
  446    udp_public_socket(Scope, _PublicPort, Public, FileNo),
  447    !,
  448    udp_receive(Public, Data, From, [max_message_size(65535)]),
  449    debug(udp(broadcast), 'Inbound on public port from ~p for scope ~p',
  450          [From, Scope]),
  451    (   in_scope(Scope, From),
  452        udp_term_string(Scope, Term, Data) % only accept valid data
  453    ->  (   udp_scope(Scope, unicast(_))
  454        ->  ld_dispatch(Public, Term, From, Scope)
  455        ;   udp_private_socket(_PrivatePort, Private, _FileNo),
  456            ld_dispatch(Private, Term, From, Scope)
  457        )
  458    ;   udp_scope(Scope, unicast(_)),
  459        udp_term_string(Scope, Term, Data),
  460        unicast_out_of_scope_request(Scope, From, Term)
  461    ->  true
  462    ;   true
  463    ).
  464
  465in_scope(Scope, Address) :-
  466    udp_scope(Scope, ScopeData),
  467    in_scope(ScopeData, Scope, Address),
  468    !.
  469in_scope(Scope, From) :-
  470    debug(udp(broadcast), 'Out-of-scope ~p datagram from ~p',
  471          [Scope, From]),
  472    fail.
  473
  474in_scope(broadcast(Subnet, Broadcast, _PublicPort), _Scope, IP:_FromPort) :-
  475    udp_broadcast_address(IP, Subnet, Broadcast).
  476in_scope(multicast(_Group, _Port), _Scope, _From).
  477in_scope(unicast(_PublicPort), Scope, IP:_) :-
  478    udp_peer(Scope, IP:_).
  479
  480
  481%!  ld_dispatch(+PrivateSocket, +Term, +From, +Scope)
  482%
  483%   Locally dispatch Term received from From. If it concerns a broadcast
  484%   request, send the replies to PrivateSocket   to  From. The multifile
  485%   hook black_list/1 can be used to ignore certain messages.
  486
  487ld_dispatch(_S, Term, From, _Scope) :-
  488    debug(udp(broadcast), 'ld_dispatch(~p) from ~p', [Term, From]),
  489    fail.
  490ld_dispatch(_S, Term, _From, _Scope) :-
  491    blacklisted(Term), !.
  492ld_dispatch(S, request(Key, Term), From, Scope) :-
  493    !,
  494    forall(safely(broadcast_request(Term)),
  495           safely((udp_term_string(Scope, reply(Key,Term), Message),
  496                   udp_send(S, Message, From, [])))).
  497ld_dispatch(_S, send(Term), _From, _Scope) :-
  498    safely(broadcast(Term)).
  499ld_dispatch(_S, reply(Key, Term), From, _Scope) :-
  500    (   reply_queue(Key, Queue)
  501    ->  safely(thread_send_message(Queue, Term:From))
  502    ;   true
  503    ).
  504
  505blacklisted(send(Term))      :- black_list(Term).
  506blacklisted(request(_,Term)) :- black_list(Term).
  507blacklisted(reply(_,Term))   :- black_list(Term).
  508
  509
  510%!  reload_udp_proxy
  511%
  512%   Update the UDP relaying proxy service.   The proxy consists of three
  513%   forwarding mechanisms:
  514%
  515%     - Listen on our _scope_.  If any messages are received, hand them
  516%       to udp_broadcast/3 to be broadcasted to _scope_ or sent to a
  517%       specific recipient.
  518%     - Listen on the _scope_ public port. Incomming messages are
  519%       relayed to the internal broadcast mechanism and replies are sent
  520%       to from our private socket.
  521%     - Listen on our private port and reply using the same port.
  522
  523reload_udp_proxy :-
  524    reload_outbound_proxy,
  525    reload_inbound_proxy.
  526
  527reload_outbound_proxy :-
  528    listening(udp_broadcast, udp(_,_), _),
  529    !.
  530reload_outbound_proxy :-
  531    listen(udp_broadcast, udp(Scope,Message),
  532           udp_broadcast(Message, Scope, 0.25)),
  533    listen(udp_broadcast, udp(Scope,Message,Timeout),
  534           udp_broadcast(Message, Scope, Timeout)),
  535    listen(udp_broadcast, udp_subnet(Message),  % backward compatibility
  536           udp_broadcast(Message, subnet, 0.25)),
  537    listen(udp_broadcast, udp_subnet(Message,Timeout),
  538           udp_broadcast(Message, subnet, Timeout)).
  539
  540reload_inbound_proxy :-
  541    catch(thread_signal(udp_inbound_proxy, throw(udp_reload)),
  542          error(existence_error(thread, _),_),
  543          fail),
  544    !.
  545reload_inbound_proxy :-
  546    thread_create(udp_inbound_proxy, _,
  547                  [ alias(udp_inbound_proxy),
  548                    detached(true)
  549                  ]).
  550
  551%!  udp_broadcast_close(+Scope)
  552%
  553%   Close a UDP broadcast scope.
  554
  555udp_broadcast_close(Scope) :-
  556    udp_scope(Scope, _ScopeData),
  557    !,
  558    assert(udp_closed(Scope)),
  559    reload_udp_proxy.
  560udp_broadcast_close(_).
  561
  562
  563%!  udp_broadcast(+What, +Scope, +TimeOut)
  564%
  565%   Send a broadcast request to my UDP peers in Scope. What is either of
  566%   the shape `Term:Address` to send Term to a specific address or query
  567%   the address from which term is answered or it is a plain `Term`.
  568%
  569%   If `Term` is  nonground,  it  is   considered  is  a  _request_ (see
  570%   broadcast_request/1) and the predicate  succeeds   for  each  answer
  571%   received within TimeOut seconds. If Term is ground it is considered
  572%   an asynchronous broadcast and udp_broadcast/3 is deterministic.
  573
  574udp_broadcast(Term:To, Scope, _Timeout) :-
  575    ground(Term), ground(To),           % broadcast to single listener
  576    !,
  577    udp_basic_broadcast(send(Term), Scope, single(To)).
  578udp_broadcast(Term, Scope, _Timeout) :-
  579    ground(Term),                       % broadcast to all listeners
  580    !,
  581    udp_basic_broadcast(send(Term), Scope, broadcast).
  582udp_broadcast(Term:To, Scope, Timeout) :-
  583    ground(To),                         % request to single listener
  584    !,
  585    setup_call_cleanup(
  586        request_queue(Id, Queue),
  587        ( udp_basic_broadcast(request(Id, Term), Scope, single(To)),
  588          udp_br_collect_replies(Queue, Timeout, Term:To)
  589        ),
  590        destroy_request_queue(Queue)).
  591udp_broadcast(Term:From, Scope, Timeout) :-
  592    !,                                  % request to all listeners, collect sender
  593    setup_call_cleanup(
  594        request_queue(Id, Queue),
  595        ( udp_basic_broadcast(request(Id, Term), Scope, broadcast),
  596          udp_br_collect_replies(Queue, Timeout, Term:From)
  597        ),
  598        destroy_request_queue(Queue)).
  599udp_broadcast(Term, Scope, Timeout) :-  % request to all listeners
  600    udp_broadcast(Term:_, Scope, Timeout).
  601
  602:- dynamic
  603    reply_queue/2.  604
  605request_queue(Id, Queue) :-
  606    Id is random(1<<63),
  607    message_queue_create(Queue),
  608    asserta(reply_queue(Id, Queue)).
  609
  610destroy_request_queue(Queue) :-         % leave queue to GC
  611    retractall(reply_queue(_, Queue)).
  612
  613
  614%!  udp_basic_broadcast(+Term, +Dest) is multi.
  615%
  616%   Create a UDP private socket and use it   to send Term to Address. If
  617%   Address is our broadcast address, set the socket in broadcast mode.
  618%
  619%   This predicate succeeds with a choice   point. Committing the choice
  620%   point closes S.
  621%
  622%   @arg Dest is one of single(Target) or `broadcast`.
  623
  624udp_basic_broadcast(Term, Scope, Dest) :-
  625    debug(udp(broadcast), 'UDP proxy outbound ~p to ~p', [Term, Dest]),
  626    udp_term_string(Scope, Term, String),
  627    udp_send_message(Dest, String, Scope).
  628
  629udp_send_message(single(Address), String, Scope) :-
  630    (   udp_scope(Scope, unicast(_))
  631    ->  udp_public_socket(Scope, _Port, S, _)
  632    ;   udp_private_socket(_Port, S, _F)
  633    ),
  634    safely(udp_send(S, String, Address, [])).
  635udp_send_message(broadcast, String, Scope) :-
  636    (   udp_scope(Scope, unicast(_))
  637    ->  udp_public_socket(Scope, _Port, S, _),
  638        forall(udp_peer(Scope, Address),
  639               ( debug(udp(broadcast), 'Unicast to ~p', [Address]),
  640                 safely(udp_send(S, String, Address, []))))
  641    ;   udp_scope(Scope, broadcast(_SubNet, Broadcast, Port))
  642    ->  udp_private_socket(_PrivatePort, S, _F),
  643        udp_send(S, String, Broadcast:Port, [])
  644    ;   udp_scope(Scope, multicast(Group, Port))
  645    ->  udp_private_socket(_PrivatePort, S, _F),
  646        udp_send(S, String, Group:Port, [])
  647    ).
  648
  649% ! udp_br_collect_replies(+Queue, +TimeOut, -TermAndFrom) is nondet.
  650%
  651%   Collect replies on Socket for  TimeOut   seconds.  Succeed  for each
  652%   received message.
  653
  654udp_br_collect_replies(Queue, Timeout, Reply) :-
  655    get_time(Start),
  656    Deadline is Start+Timeout,
  657    repeat,
  658       (   thread_get_message(Queue, Reply,
  659                              [ deadline(Deadline)
  660                              ])
  661       ->  true
  662       ;   !,
  663           fail
  664       ).
  665
  666%!  udp_broadcast_initialize(+IPAddress, +Options) is semidet.
  667%
  668%   Initialized UDP broadcast bridge. IPAddress is the IP address on the
  669%   network we want to broadcast on.  IP addresses are terms ip(A,B,C,D)
  670%   or an atom or string of the format =|A.B.C.D|=.   Options processed:
  671%
  672%     - scope(+ScopeName)
  673%     Name of the scope.  Default is `subnet`.
  674%     - subnet_mask(+SubNet)
  675%     Subnet to broadcast on.  This uses the same syntax as IPAddress.
  676%     Default classifies the network as class A, B or C depending on
  677%     the the first octet and applies the default mask.
  678%     - port(+Port)
  679%     Public port to use.  Default is 20005.
  680%     - method(+Method)
  681%     Method to send a message to multiple peers.  One of
  682%       - broadcast
  683%       Use UDP broadcast messages to the LAN.  This is the
  684%       default
  685%       - multicast
  686%       Use UDP multicast messages.  This can be used on WAN networks,
  687%       provided the intermediate routers understand multicast.
  688%       - unicast
  689%       Send the messages individually to all registered peers.
  690%
  691%   For compatibility reasons Options may be the subnet mask.
  692
  693udp_broadcast_initialize(IP, Options) :-
  694    with_mutex(udp_broadcast,
  695               udp_broadcast_initialize_sync(IP, Options)).
  696
  697udp_broadcast_initialize_sync(IP, Options) :-
  698    nonvar(Options),
  699    Options = ip(_,_,_,_),
  700    !,
  701    udp_broadcast_initialize(IP, [subnet_mask(Options)]).
  702udp_broadcast_initialize_sync(IP, Options) :-
  703    to_ip4(IP, IPAddress),
  704    option(method(Method), Options, broadcast),
  705    must_be(oneof([broadcast, multicast, unicast]), Method),
  706    udp_broadcast_initialize_sync(Method, IPAddress, Options),
  707    reload_udp_proxy.
  708
  709udp_broadcast_initialize_sync(broadcast, IPAddress, Options) :-
  710    option(subnet_mask(Subnet), Options, _),
  711    mk_subnet(Subnet, IPAddress, Subnet4),
  712    option(port(Port), Options, 20005),
  713    option(scope(Scope), Options, subnet),
  714
  715    udp_broadcast_address(IPAddress, Subnet4, Broadcast),
  716    udp_broadcast_close(Scope),
  717    assertz(udp_scope(Scope, broadcast(Subnet4, Broadcast, Port))).
  718udp_broadcast_initialize_sync(unicast, _IPAddress, Options) :-
  719    option(port(Port), Options, 20005),
  720    option(scope(Scope), Options, subnet),
  721    udp_broadcast_close(Scope),
  722    assertz(udp_scope(Scope, unicast(Port))).
  723udp_broadcast_initialize_sync(multicast, IPAddress, Options) :-
  724    option(port(Port), Options, 20005),
  725    option(scope(Scope), Options, subnet),
  726    udp_broadcast_close(Scope),
  727    multicast_address(IPAddress),
  728    assertz(udp_scope(Scope, multicast(IPAddress, Port))).
  729
  730to_ip4(Atomic, ip(A,B,C,D)) :-
  731    atomic(Atomic),
  732    !,
  733    (   split_string(Atomic, ".", "", Strings),
  734        maplist(number_string, [A,B,C,D], Strings)
  735    ->  true
  736    ;   syntax_error(illegal_ip_address)
  737    ).
  738to_ip4(IP, IP).
  739
  740mk_subnet(Var, IP, Subnet) :-
  741    var(Var),
  742    !,
  743    (   default_subnet(IP, Subnet)
  744    ->  true
  745    ;   domain_error(ip_with_subnet, IP)
  746    ).
  747mk_subnet(Subnet, _, Subnet4) :-
  748    to_ip4(Subnet, Subnet4).
  749
  750default_subnet(ip(A,_,_,_), ip(A,0,0,0)) :-
  751    between(1,126, A), !.
  752default_subnet(ip(A,B,_,_), ip(A,B,0,0)) :-
  753    between(128,191, A), !.
  754default_subnet(ip(A,B,C,_), ip(A,B,C,0)) :-
  755    between(192,223, A), !.
  756
  757multicast_address(ip(A,_,_,_)) :-
  758    between(224, 239, A),
  759    !.
  760multicast_address(IP) :-
  761    domain_error(multicast_network, IP).
  762
  763
  764		 /*******************************
  765		 *          UNICAST PEERS	*
  766		 *******************************/
  767
  768%!  udp_peer_add(+Scope, +Address) is det.
  769%!  udp_peer_del(+Scope, ?Address) is det.
  770%!  udp_peer(?Scope, ?Address) is nondet.
  771%
  772%   Manage and query the set  of  known   peers  for  a unicast network.
  773%   Address is either a term  IP:Port  or   a  plain  IP address. In the
  774%   latter case the default port registered with the scope is used.
  775%
  776%   @arg Address has canonical form ip(A,B,C,D):Port.
  777
  778udp_peer_add(Scope, Address) :-
  779    must_be(ground, Address),
  780    peer_address(Address, Scope, Canonical),
  781    (   udp_scope_peer(Scope, Canonical)
  782    ->  true
  783    ;   assertz(udp_scope_peer(Scope, Canonical))
  784    ).
  785
  786udp_peer_del(Scope, Address) :-
  787    peer_address(Address, Scope, Canonical),
  788    retractall(udp_scope_peer(Scope, Canonical)).
  789
  790udp_peer(Scope, IPAddress) :-
  791    udp_scope_peer(Scope, IPAddress).
  792
  793peer_address(IP:Port, _Scope, IPAddress:Port) :-
  794    !,
  795    to_ip4(IP, IPAddress).
  796peer_address(IP, Scope, IPAddress:Port) :-
  797    (   udp_scope(Scope, unicast(Port))
  798    ->  true
  799    ;   existence_error(udp_scope, Scope)
  800    ),
  801    to_ip4(IP, IPAddress).
  802
  803
  804
  805		 /*******************************
  806		 *             HOOKS		*
  807		 *******************************/
  808
  809%!  udp_term_string_hook(+Scope, +Term, -String) is det.
  810%!  udp_term_string_hook(+Scope, -Term, +String) is semidet.
  811%
  812%   Hook  for  serializing  the  message    Term.   The  default  writes
  813%   =|%prolog\n|=, followed by the Prolog term  in quoted notation while
  814%   ignoring operators. This hook may use alternative serialization such
  815%   as fast_term_serialized/2, use  library(ssl)   to  realise encrypted
  816%   messages, etc.
  817%
  818%   @arg Scope is the scope for which the message is broadcasted.  This
  819%   can be used to use different serialization for different scopes.
  820%   @arg Term encapsulates the term broadcasted by the application as
  821%   follows:
  822%
  823%     - send(ApplTerm)
  824%       Is sent by broadcast(udp(Scope, ApplTerm))
  825%     - request(Id,ApplTerm)
  826%       Is sent by broadcast_request/1, where Id is a unique large
  827%       (64 bit) integer.
  828%     - reply(Id,ApplTerm)
  829%       Is sent to reply on a broadcast_request/1 request that has
  830%       been received.  Arguments are the same as above.
  831
  832%!  udp_term_string(+Scope, +Term, -String) is det.
  833%!  udp_term_string(+Scope, -Term, +String) is semidet.
  834%
  835%   Serialize an arbitrary Prolog  term  as   a  string.  The  string is
  836%   prefixed by a magic key to ensure   we only accept messages that are
  837%   meant for us.
  838%
  839%   In mode (+,-), Term is written with the options ignore_ops(true) and
  840%   quoted(true).
  841%
  842%   This predicate first calls  udp_term_string_hook/3.
  843
  844udp_term_string(Scope, Term, String) :-
  845    udp_term_string_hook(Scope, Term, String),
  846    !.
  847udp_term_string(_Scope, Term, String) :-
  848    (   var(String)
  849    ->  format(string(String), '%-prolog-\n~W',
  850               [ Term,
  851                 [ ignore_ops(true),
  852                   quoted(true)
  853                 ]
  854               ])
  855    ;   sub_string(String, 0, _, _, '%-prolog-\n'),
  856        term_string(Term, String,
  857                    [ syntax_errors(quiet)
  858                    ])
  859    ).
  860
  861%!  unicast_out_of_scope_request(+Scope, +From, +Data) is semidet.
  862
  863%!  udp_unicast_join_hook(+Scope, +From, +Data) is semidet.
  864%
  865%   This multifile hook is called if an   UDP package is received on the
  866%   port of the unicast network identified by  Scope. From is the origin
  867%   IP and port and Data is  the   message  data that is deserialized as
  868%   defined for the scope (see udp_term_string/3).
  869%
  870%   This hook is intended to initiate a  new node joining the network of
  871%   peers. We could in theory also  omit   the  in-scope  test and use a
  872%   normal broadcast to join. Using a different channal however provides
  873%   a basic level of security. A   possibe  implementation is below. The
  874%   first fragment is a hook  added  to   the  server,  the  second is a
  875%   predicate added to a client and the   last  initiates the request in
  876%   the client. The excanged term (join(X)) can   be  used to exchange a
  877%   welcome handshake.
  878%
  879%
  880%   ```
  881%   :- multifile udp_broadcast:udp_unicast_join_hook/3.
  882%   udp_broadcast:udp_unicast_join_hook(Scope, From, join(welcome)) :-
  883%       udp_peer_add(Scope, From),
  884%   ```
  885%
  886%   ```
  887%   join_request(Scope, Address, Reply) :-
  888%       udp_peer_add(Scope, Address),
  889%       broadcast_request(udp(Scope, join(X))).
  890%   ```
  891%
  892%   ```
  893%   ?- join_request(myscope, "1.2.3.4":10001, Reply).
  894%   Reply = welcome.
  895%   ```
  896
  897unicast_out_of_scope_request(Scope, From, send(Term)) :-
  898    udp_unicast_join_hook(Scope, From, Term).
  899unicast_out_of_scope_request(Scope, From, request(Key, Term)) :-
  900    udp_unicast_join_hook(Scope, From, Term),
  901    udp_public_socket(Scope, _Port, Socket, _FileNo),
  902    safely((udp_term_string(Scope, reply(Key,Term), Message),
  903            udp_send(Socket, Message, From, [])))