View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald, Jan Wielemaker
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2009-2018, Jeffrey Rosenwald
    7                   CWI, Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(paxos,
   37          [ paxos_get/1,                        % ?Term
   38            paxos_get/2,                        % +Key, -Value
   39            paxos_get/3,                        % +Key, -Value, +Options
   40            paxos_set/1,                        % ?Term
   41            paxos_set/2,                        % +Key, +Value
   42            paxos_set/3,                        % +Key, +Value, +Options
   43            paxos_on_change/2,                  % ?Term, +Goal
   44            paxos_on_change/3,                  % ?Key, ?Value, +Goal
   45
   46            paxos_initialize/1,			% +Options
   47                                                % Hook support
   48            paxos_replicate_key/3               % +Nodes, ?Key, +Options
   49          ]).   50:- use_module(library(broadcast)).   51:- use_module(library(debug)).   52:- use_module(library(lists)).   53:- use_module(library(settings)).   54:- use_module(library(option)).   55:- use_module(library(error)).   56:- use_module(library(apply)).   57:- use_module(library(solution_sequences)).   58
   59/** <module> A Replicated Data Store
   60
   61This module provides a replicated data store that is coordinated using a
   62variation on Lamport's Paxos concensus protocol.  The original method is
   63described in his paper entitled, "The   Part-time Parliament", which was
   64published in 1998. The algorithm is   tolerant of non-Byzantine failure.
   65That is late or lost delivery or   reply,  but not senseless delivery or
   66reply. The present algorithm takes advantage  of the convenience offered
   67by multicast to the quorum's membership,   who  can remain anonymous and
   68who can come and go as they  please without effecting Liveness or Safety
   69properties.
   70
   71Paxos' quorum is a set of one or more attentive members, whose processes
   72respond to queries within some known time limit (< 20ms), which includes
   73roundtrip delivery delay. This property is   easy  to satisfy given that
   74every coordinator is necessarily a member of   the quorum as well, and a
   75quorum of one is  permitted.  An   inattentive  member  (e.g.  one whose
   76actions are late or lost) is deemed to be "not-present" for the purposes
   77of the present transaction and consistency   cannot  be assured for that
   78member. As long as there is at least one attentive member of the quorum,
   79then persistence of the database is assured.
   80
   81Each member maintains a ledger  of   terms  along with information about
   82when  they  were   originally   recorded.    The   member's   ledger  is
   83deterministic. That is to say  that  there   can  only  be one entry per
   84functor/arity combination. No member will  accept   a  new term proposal
   85that has a line number that is equal-to   or  lower-than the one that is
   86already recorded in the ledger.
   87
   88Paxos is a three-phase protocol:
   89
   90   1: A coordinator first prepares the quorum for a new proposal by
   91   broadcasting a proposed term. The quorum responds by returning the
   92   last known line number for that functor/arity combination that is
   93   recorded in their respective ledgers.
   94
   95   2: The coordinator selects the highest line number it receives,
   96   increments it by one, and then asks the quorum to finally accept the
   97   new term with the new line number. The quorum checks their respective
   98   ledgers once again and if there is still no other ledger entry for
   99   that functor/arity combination that is equal-to or higher than the
  100   specified line, then each member records the term in the ledger at
  101   the specified line. The member indicates consent by returning the
  102   specified line number back to the coordinator. If consent is withheld
  103   by a member, then the member returns a =nack= instead. The
  104   coordinator requires unanimous consent. If it isn't achieved then the
  105   proposal fails and the coordinator must start over from the
  106   beginning.
  107
  108   3: Finally, the coordinator concludes the successful negotiation by
  109   broadcasting the agreement to the quorum in the form of a
  110   paxos(changed(Key,Value) event. This is the only event that
  111   should be of interest to user programs.
  112
  113For practical reasons, we rely  on   the  partially synchronous behavior
  114(e.g. limited upper time bound for  replies) of broadcast_request/1 over
  115TIPC to ensure Progress. Perhaps more importantly,   we rely on the fact
  116that the TIPC broadcast listener state  machine guarantees the atomicity
  117of broadcast_request/1 at the process level, thus obviating the need for
  118external mutual exclusion mechanisms.
  119
  120_|Note that this algorithm does not guarantee the rightness of the value
  121proposed. It only guarantees that if   successful, the value proposed is
  122identical for all attentive members of the quorum.|_
  123
  124@author    Jeffrey Rosenwald (JeffRose@acm.org)
  125@license   BSD-2
  126@see       tipc_broadcast.pl, udp_broadcast.pl
  127*/
  128
  129:- meta_predicate
  130    paxos_on_change(?, 0),
  131    paxos_on_change(?, ?, 0).  132
  133:- multifile
  134    paxos_message_hook/3.               % +PaxOS, +TimeOut, -Message
  135
  136:- setting(max_sets, nonneg, 20,
  137           "Max Retries to get to an agreement").  138:- setting(max_gets, nonneg, 5,
  139           "Max Retries to get a value from the forum").  140:- setting(response_timeout, float, 0.020,
  141           "Max time to wait for a response").  142:- setting(replication_rate, number, 1000,
  143           "Number of keys replicated per second").  144:- setting(death_half_life, number, 10,
  145           "Half-time for failure score").  146:- setting(death_score, number, 100,
  147           "Number of keys replicated per second").  148
  149
  150%!  paxos_initialize(+Options) is det.
  151%
  152%   Initialize this Prolog process as a   paxos node. The initialization
  153%   requires an initialized and configured TIPC,  UDP or other broadcast
  154%   protocol. Calling this initialization may be  omitted, in which case
  155%   the equivant of paxos_initialize([]) is executed   lazily as part of
  156%   the first paxos operation.  Defined options:
  157%
  158%     - node(?NodeID)
  159%     When instantiated, this node rejoins the network with the given
  160%     node id. A fixed node idea should be used if the node is
  161%     configured for persistency and causes the new node to receive
  162%     updates for keys that have been created or modified since the
  163%     node left the network.  If NodeID is a variable it is unified
  164%     with the discovered NodeID.
  165%
  166%     NodeID must be a small non-negative integer as these identifiers
  167%     are used in bitmaps.
  168
  169:- dynamic  paxos_initialized/0.  170:- volatile paxos_initialized/0.  171
  172paxos_initialize(_Options) :-
  173    paxos_initialized,
  174    !.
  175paxos_initialize(Options) :-
  176    with_mutex(paxos, paxos_initialize_sync(Options)).
  177
  178paxos_initialize_sync(_Options) :-
  179    paxos_initialized,
  180    !.
  181paxos_initialize_sync(Options) :-
  182    at_halt(paxos_leave),
  183    listen(paxos, paxos(X), paxos_message(X)),
  184    paxos_assign_node(Options),
  185    start_replicator,
  186    asserta(paxos_initialized).
  187
  188paxos_initialize :-
  189    paxos_initialize([]).
  190
  191
  192		 /*******************************
  193		 *            ADMIN		*
  194		 *******************************/
  195
  196%!  paxos_get_admin(+Name, -Value) is semidet.
  197%!  paxos_set_admin(+Name, +Value) is semidet.
  198%
  199%   Set administrative keys. We use a wrapper  such that we can hide the
  200%   key identity.
  201
  202admin_key(quorum, '$paxos_quorum').
  203admin_key(dead,  '$paxos_dead_nodes').
  204
  205paxos_get_admin(Name, Value) :-
  206    admin_key(Name, Key),
  207    paxos_get(Key, Value).
  208
  209paxos_set_admin(Name, Value) :-
  210    admin_key(Name, Key),
  211    paxos_set(Key, Value).
  212
  213paxos_set_admin_bg(Name, Value) :-
  214    thread_create(ignore(paxos_set_admin(Name, Value)), _,
  215                  [ detached(true)
  216                  ]).
  217
  218
  219		 /*******************************
  220		 *           NODE DATA		*
  221		 *******************************/
  222
  223%!  node(?NodeId).
  224%!  quorum(?Bitmap).
  225%!  dead(?Bitmap).
  226%!  failed(?Bitmap).
  227%!  failed(?NodeId, ?LastTried, ?Score).
  228%
  229%   Track our identity as well as as  the   status  of  our peers in the
  230%   network. NodeId is a small integer. Multiple NodeIds are combined in
  231%   a Bitmap.
  232%
  233%     - node/1 is our identity.
  234%     - quorum/1 is the set of members of the quorum
  235%     - failed/1 is the set of members for which the last message was
  236%       not confirmed.
  237%     - failed/3 tracks individual failed nodes. If accumulates failures
  238%       until the node is marked _dead_.
  239%     - dead/1 is the set of members that is considered dead.
  240
  241:- dynamic
  242    node/1,                             % NodeID
  243    quorum/1,                           % Bitmap
  244    failed/1,                           % Bitmap
  245    failed/3,                           % NodeID, LastTried, Score
  246    leaving/0,                          % Node is leaving
  247    dead/1,                             % Bitmap
  248    salt/1.                             % Unique key
  249:- volatile
  250    node/1,
  251    quorum/1,
  252    failed/1,
  253    failed/3,
  254    leaving/0,
  255    dead/1,
  256    salt/1.  257
  258%!  paxos_assign_node(+Options) is det.
  259%
  260%   Assign a node for this  paxos  instance.   If  node  is  given as an
  261%   option, this is the node id that   is used. Otherwise the network is
  262%   analysed and the system selects a new node.
  263
  264paxos_assign_node(Options) :-
  265    (   option(node(Node), Options)
  266    ->  node(Node)
  267    ;   node(_)
  268    ),                                          % already done
  269    !.
  270paxos_assign_node(Options) :-
  271    between(1, 20, Retry),
  272    option(node(Node), Options, Node),
  273    (   node(_)
  274    ->  permission_error(set, paxos_node, Node)
  275    ;   true
  276    ),
  277    retractall(dead(_)),
  278    retractall(quorum(_)),
  279    retractall(failed(_)),
  280    retractall(failed(_,_,_)),
  281    retractall(leaving),
  282    Salt is random(1<<63),
  283    asserta(salt(Salt)),
  284    paxos_message(node(N,Q,D):From, 0.25, NodeQuery),
  285    findall(t(N,Q,D,From),
  286            broadcast_request(NodeQuery),
  287            Network),
  288    select(t(self,0,Salt,Me), Network, AllNodeStatus),
  289    partition(starting, AllNodeStatus, Starting, Running),
  290    nth_starting(Starting, Salt, Offset),
  291    retractall(salt(_)),
  292    debug(paxos(node), 'Me@~p; starting: ~p; running: ~p',
  293          [Me, Starting, Running]),
  294    arg_union(2, Running, Quorum),
  295    arg_union(3, Running, Dead),
  296    (   var(Node)
  297    ->  (   call_nth(( between(0, 1000, Node),
  298                       \+ memberchk(t(Node,_,_,_), Running),
  299                       Dead /\ (1<<Node) =:= 0),
  300                     Offset)
  301        ->  debug(paxos(node), 'Assigning myself node ~d', [Node])
  302        ;   resource_error(paxos_nodes)
  303        )
  304    ;   memberchk(t(Node,_,_,_), Running)
  305    ->  permission_error(set, paxos_node, Node)
  306    ;   Rejoin = true
  307    ),
  308    asserta(node(Node)),
  309    (   claim_node(Node, Me)
  310    ->  !,
  311        asserta(dead(Dead)),
  312        set_quorum(Node, Quorum),
  313        (   Rejoin == true
  314        ->  paxos_rejoin
  315        ;   true
  316        )
  317    ;   debug(paxos(node), 'Node already claimed; retrying (~p)', [Node, Retry]),
  318        retractall(node(Node)),
  319        fail
  320    ).
  321
  322starting(t(self,_Quorum,_Salt,_Address)).
  323
  324nth_starting(Starting, Salt, N) :-
  325    maplist(arg(3), Starting, Salts),
  326    sort([Salt|Salts], Sorted),
  327    nth1(N, Sorted, Salt),
  328    !.
  329
  330claim_node(Node, Me) :-
  331    paxos_message(claim_node(Node, Ok):From, 0.25, NodeQuery),
  332    forall((   broadcast_request(NodeQuery),
  333               From \== Me,
  334               debug(paxos(node), 'Claim ~p ~p: ~p', [Node, From, Ok])
  335           ),
  336           Ok == true).
  337
  338set_quorum(Node, Quorum0) :-
  339    Quorum is Quorum0 \/ (1<<Node),
  340    debug(paxos(node), 'Adding ~d to quorum (now 0x~16r)', [Node, Quorum]),
  341    asserta(quorum(Quorum)),
  342    paxos_set_admin(quorum, Quorum).
  343
  344
  345%!  paxos_rejoin
  346%
  347%   Re-join the network.  Tasks:
  348%
  349%     - Remove myself from the dead list if I'm on there
  350%     - Tell the replicators we lost everything.
  351
  352paxos_rejoin :-
  353    node(Node),
  354    repeat,
  355        paxos_get_admin(dead, Dead0),
  356        Dead is Dead0 /\ \(1<<Node),
  357        (   Dead == Dead0
  358        ->  true
  359        ;   paxos_set_admin(dead, Dead)
  360        ),
  361    !.
  362
  363%!  paxos_leave is det.
  364%!  paxos_leave(+Node) is det.
  365%
  366%   Leave the network.  The  predicate   paxos_leave/0  is  called  from
  367%   at_halt/1 to ensure the node is  deleted   as  the process dies. The
  368%   paxos_leave/1 version is called  to  discard   other  nodes  if they
  369%   repeatedly did not respond to queries.
  370
  371paxos_leave :-
  372    node(Node),
  373    !,
  374    asserta(leaving),
  375    paxos_leave(Node),
  376    Set is 1<<Node,
  377    paxos_message(forget(Set), -, Forget),
  378    broadcast(Forget),
  379    unlisten(paxos),
  380    retractall(leaving).
  381paxos_leave.
  382
  383paxos_leave(Node) :-
  384    !,
  385    paxos_update_set(quorum, del(Node)),
  386    paxos_update_set(dead,   add(Node)).
  387paxos_leave(_).
  388
  389paxos_update_set(Set, How) :-
  390    repeat,
  391      Term =.. [Set,Value],
  392      call(Term),
  393      (   How = add(Node)
  394      ->  NewValue is Value \/  (1<<Node)
  395      ;   How = del(Node)
  396      ->  NewValue is Value /\ \(1<<Node)
  397      ),
  398      (   Value == NewValue
  399      ->  true
  400      ;   paxos_set_admin(Set, NewValue)
  401      ),
  402    !.
  403
  404		 /*******************************
  405		 *          NODE STATUS		*
  406		 *******************************/
  407
  408%!  update_failed(+Action, +Quorum, +Alive) is det.
  409%
  410%   We just sent the Quorum a  message  and   got  a  reply from the set
  411%   Alive.
  412%
  413%   @arg is one of `set`, `get` or `replicate` and indicates the
  414%   intended action.
  415
  416update_failed(Action, Quorum, Alive) :-
  417    Failed is Quorum /\ \Alive,
  418    alive(Alive),
  419    consider_dead(Failed),
  420    (   failed(Failed)
  421    ->  true
  422    ;   (   clause(failed(_Old), true, Ref)
  423        ->  asserta(failed(Failed)),
  424            erase(Ref),
  425            debug(paxos(node), 'Updated failed quorum to 0x~16r', [Failed])
  426        ;   asserta(failed(Failed))
  427        ),
  428        (   Action == set
  429        ->  start_replicator
  430        ;   true
  431        )
  432    ).
  433
  434consider_dead(0) :-
  435    !.
  436consider_dead(Failed) :-
  437    Node is lsb(Failed),
  438    consider_dead1(Node),
  439    Rest is Failed /\ \(1<<Node),
  440    consider_dead(Rest).
  441
  442consider_dead1(Node) :-
  443    clause(failed(Node, Last, Score), true, Ref),
  444    !,
  445    setting(death_half_life, HalfLife),
  446    setting(death_score, DeathScore),
  447    get_time(Now),
  448    Passed is Now-Last,
  449    NewScore is Score*(2**(-Passed/HalfLife)) + 10,
  450    asserta(failed(Node, Now, NewScore)),
  451    erase(Ref),
  452    (   NewScore < DeathScore
  453    ->  debug(paxos(node), 'Consider node ~d dead', [Node]),
  454        paxos_leave(Node)
  455    ;   true
  456    ).
  457consider_dead1(Node) :-
  458    get_time(Now),
  459    asserta(failed(Node, Now, 10)).
  460
  461alive(Bitmap) :-
  462    (   clause(failed(Node, _Last, _Score), true, Ref),
  463        Bitmap /\ (1<<Node) =\= 0,
  464        erase(Ref),
  465        fail
  466    ;   true
  467    ).
  468
  469
  470%!  life_quorum(-Quorum, -LifeQuorum) is det.
  471%
  472%   Find the Quorum and the living nodes   from  the Quorum. This is the
  473%   set for which we wait.  If  the   LifeQuorum  is  not  a majority we
  474%   address the whole Quorum.
  475%
  476%   @tbd At some point in time we must remove a node from the quorum.
  477
  478life_quorum(Quorum, LifeQuorum) :-
  479    quorum(Quorum),
  480    (   failed(Failed),
  481        Failed \== 0,
  482        LifeQuorum is Quorum /\ \Failed,
  483        majority(LifeQuorum, Quorum)
  484    ->  true
  485    ;   LifeQuorum = Quorum
  486    ).
  487
  488
  489		 /*******************************
  490		 *        NETWORK STATUS	*
  491		 *******************************/
  492
  493:- admin_key(quorum, Key),
  494   listen(paxos_changed(Key, Quorum),
  495          update_quorum(Quorum)).  496:- admin_key(dead, Key),
  497   listen(paxos_changed(Key, Death),
  498          update_dead(Death)).  499
  500update_quorum(Proposed) :-
  501    debug(paxos(node), 'Received quorum proposal 0x~16r', [Proposed]),
  502    quorum(Proposed),
  503    !.
  504update_quorum(Proposed) :-
  505    leaving,
  506    !,
  507    update(quorum(Proposed)).
  508update_quorum(Proposed) :-
  509    node(Node),
  510    Proposed /\ (1<<Node) =\= 0,
  511    !,
  512    update(quorum(Proposed)).
  513update_quorum(Proposed) :-
  514    node(Node),
  515    NewQuorum is Proposed \/ (1<<Node),
  516    update(quorum(NewQuorum)),
  517    debug(paxos(node), 'I''m not in the quorum! Proposing 0x~16r', [NewQuorum]),
  518    paxos_set_admin_bg(quorum, NewQuorum).
  519
  520update_dead(Proposed) :-
  521    debug(paxos(node), 'Received dead proposal 0x~16r', [Proposed]),
  522    dead(Proposed),
  523    !.
  524update_dead(Proposed) :-
  525    leaving,
  526    !,
  527    update(dead(Proposed)).
  528update_dead(Proposed) :-
  529    node(Node),
  530    Proposed /\ (1<<Node) =:= 0,
  531    !,
  532    update(dead(Proposed)).
  533update_dead(Proposed) :-
  534    node(Node),
  535    NewDead is Proposed /\ \(1<<Node),
  536    update(dead(NewDead)),
  537    paxos_set_admin_bg(dead, NewDead).
  538
  539update(Clause) :-
  540    functor(Clause, Name, Arity),
  541    functor(Generic, Name, Arity),
  542    (   clause(Generic, true, Ref)
  543    ->  asserta(Clause),
  544        erase(Ref)
  545    ;   asserta(Clause)
  546    ).
  547
  548		 /*******************************
  549		 *         INBOUND EVENTS	*
  550		 *******************************/
  551
  552%!  paxos_message(?Message)
  553%
  554%   Handle inbound actions from our peers.   Defines  values for Message
  555%   are:
  556%
  557%     - prepare(+Key,-Node,-Gen,+Value)
  558%     A request message to set Key to Value. Returns the current
  559%     generation at which we have a value or `0` for Gen and the
  560%     our node id for Node.
  561%     - accept(+Key,-Node,+Gen,-GenA,+Value)
  562%     A request message to set Key to Value if Gen is newer than
  563%     the generation we have for Key.  In that case GenA is Gen.
  564%     Otherwise we reject using GenA = `nack`.
  565%     - changed(+Key,+Gen,+Value,+Acceptors)
  566%     The leader got enough accepts for setting Key to Value at Gen.
  567%     Acceptors is the set of nodes that accepted this value.
  568%     - learn(+Key,-Node,+Gen,-GenA,+Value)
  569%     Request message peforming phase one for replication to learner
  570%     nodes.
  571%     - learned(+Key,+Gen,+Value,+Acceptors)
  572%     Phase two of the replication. Confirm the newly learned knowledge.
  573%     - retrieve(+Key,-Node,-Gen,-Value)
  574%     A request message to retrieve our value for Key.  Also provides
  575%     our node id and the generation.
  576%     - forget(+Nodes)
  577%     Forget the existence of Nodes.
  578%     - node(-Node,-Quorum,-Dead)
  579%     Get my view about the network.  Node is the (integer) node id of
  580%     this node, Quorum is the idea of the quorum and Dead is the idea
  581%     about non-responsive nodes.
  582%
  583%   @tbd: originally the changed was  handled  by   a  get  and when not
  584%   successful with a new set, named   _paxos_audit_. I don't really see
  585%   why we need this.
  586
  587paxos_message(prepare(Key,Node,Gen,Value)) :-
  588    node(Node),
  589    (   ledger(Key, Gen, _)
  590    ->  true
  591    ;   Gen = 0,
  592        ledger_create(Key, Gen, Value)
  593    ),
  594    debug(paxos, 'Prepared ~p-~p@~d', [Key,Value,Gen]).
  595paxos_message(accept(Key,Node,Gen,GenA,Value)) :-
  596    node(Node),
  597    (   ledger_update(Key, Gen, Value)
  598    ->  debug(paxos, 'Accepted ~p-~p@~d', [Key,Value,Gen]),
  599        GenA = Gen
  600    ;   debug(paxos, 'Rejected ~p-~p@~d', [Key,Value,Gen]),
  601        GenA = nack
  602    ).
  603paxos_message(changed(Key,Gen,Value,Acceptors)) :-
  604    debug(paxos, 'Changed ~p-~p@~d for ~p', [Key, Value, Gen, Acceptors]),
  605    ledger_update_holders(Key,Gen,Acceptors),
  606    broadcast(paxos_changed(Key,Value)).
  607paxos_message(learn(Key,Node,Gen,GenA,Value)) :-
  608    node(Node),
  609    debug(paxos, 'Learn ~p-~p@~p?', [Key, Value, Gen]),
  610    (   ledger_learn(Key,Gen,Value)
  611    ->  debug(paxos, 'Learned ~p-~p@~d', [Key,Value,Gen]),
  612        GenA = Gen
  613    ;   debug(paxos, 'Rejected ~p@~d', [Key, Gen]),
  614        GenA = nack
  615    ).
  616paxos_message(learned(Key,Gen,_Value,Acceptors)) :-
  617    ledger_update_holders(Key,Gen,Acceptors).
  618paxos_message(retrieve(Key,Node,K,Value)) :-
  619    node(Node),
  620    debug(paxos, 'Retrieving ~p', [Key]),
  621    ledger(Key,K,Value),
  622    debug(paxos, 'Retrieved ~p-~p@~d', [Key,Value,K]),
  623    !.
  624paxos_message(forget(Nodes)) :-
  625    ledger_forget(Nodes).
  626paxos_message(node(Node,Quorum,Dead)) :-
  627    (   node(Node),
  628        quorum(Quorum),
  629        dead(Dead)
  630    ->  true
  631    ;   salt(Salt),
  632        Node = self,
  633        Quorum = 0,
  634        Dead = Salt
  635    ).
  636paxos_message(claim_node(Node, Ok)) :-
  637    (   node(Node)
  638    ->  Ok = false
  639    ;   Ok = true
  640    ).
  641
  642
  643		 /*******************************
  644		 *     KEY-VALUE OPERATIONS	*
  645		 *******************************/
  646
  647%%  paxos_set(+Term) is semidet.
  648%
  649%   Equivalent to paxos_key(Term,Key), pasox_set(Key,Term).   I.e., Term
  650%   is a ground compound term and its   key is the name/arity pair. This
  651%   version provides compatibility with older versions of this library.
  652
  653%%  paxos_set(+Key, +Value) is semidet.
  654%%  paxos_set(+Key, +Value, +Options) is semidet.
  655%
  656%   negotiates to have Key-Value recorded in the  ledger for each of the
  657%   quorum's members. This predicate succeeds  if the quorum unanimously
  658%   accepts the proposed term. If no such   entry  exists in the Paxon's
  659%   ledger, then one is silently  created.   paxos_set/1  will retry the
  660%   transaction several times (default: 20)   before failing. Failure is
  661%   rare and is usually the result of a collision of two or more writers
  662%   writing to the same term at precisely  the same time. On failure, it
  663%   may be useful to wait some random period of time, and then retry the
  664%   transaction. By specifying a retry count   of zero, paxos_set/2 will
  665%   succeed iff the first ballot succeeds.
  666%
  667%   On   success,   paxos_set/1   will   also     broadcast   the   term
  668%   paxos(changed(Key,Value), to the quorum.
  669%
  670%   Options processed:
  671%
  672%     - retry(Retries)
  673%     is a non-negative integer specifying the number of retries that
  674%     will be performed before a set is abandoned.  Defaults to the
  675%     _setting_ `max_sets` (20).
  676%     - timeout(+Seconds)
  677%     Max time to wait for the forum to reply.  Defaults to the
  678%     _setting_ `response_timeout` (0.020, 20ms).
  679%
  680%   @arg Term is a compound  that   may  have  unbound variables.
  681%   @tbd If the Value is already current, should we simply do nothing?
  682
  683paxos_set(Term) :-
  684    paxos_key(Term, Key),
  685    paxos_set(Key, Term, []).
  686
  687paxos_set(Key, Value) :-
  688    paxos_set(Key, Value, []).
  689
  690paxos_set(Key, Value, Options) :-
  691    must_be(ground, Key-Value),
  692    paxos_initialize,
  693    option(retry(Retries), Options, Retries),
  694    option(timeout(TMO), Options, TMO),
  695    apply_default(Retries, max_sets),
  696    apply_default(TMO, response_timeout),
  697    paxos_message(prepare(Key,Np,Rp,Value), TMO, Prepare),
  698    between(0, Retries, _),
  699      life_quorum(Quorum, Alive),
  700      debug(paxos, 'Set: ~p -> ~p', [Key, Value]),
  701      collect(Quorum, false, Np, Rp, Prepare, Rps, PrepNodes),
  702      debug(paxos, 'Set: quorum: 0x~16r, prepared by 0x~16r, gens ~p',
  703            [Quorum, PrepNodes, Rps]),
  704      majority(PrepNodes, Quorum),
  705      max_list(Rps, K),
  706      succ(K, K1),
  707      paxos_message(accept(Key,Na,K1,Ra,Value), TMO, Accept),
  708      collect(Alive, Ra == nack, Na, Ra, Accept, Ras, AcceptNodes),
  709      majority(AcceptNodes, Quorum),
  710      intersecting(PrepNodes, AcceptNodes),
  711      c_element(Ras, K, K1),
  712      broadcast(paxos(log(Key,Value,AcceptNodes,K1))),
  713      paxos_message(changed(Key,K1,Value,AcceptNodes), -, Changed),
  714      broadcast(Changed),
  715      update_failed(set, Quorum, AcceptNodes),
  716    !.
  717
  718apply_default(Var, Setting) :-
  719    var(Var),
  720    !,
  721    setting(Setting, Var).
  722apply_default(_, _).
  723
  724majority(SubSet, Set) :-
  725    popcount(SubSet) >= (popcount(Set)+2)//2.
  726
  727intersecting(Set1, Set2) :-
  728    Set1 /\ Set2 =\= 0.
  729
  730
  731%!  collect(+Quorum, :Stop, ?Node, ?Template, ?Message,
  732%!          -Result, -NodeSet) is semidet.
  733%
  734%   Perform a broadcast request using Message.   Node and Template share
  735%   with Message and extract the replying node and the result value from
  736%   Message. Result is the list of  instantiations for Template received
  737%   and NodeSet is the set (bitmask) of   Node values that replies, i.e.
  738%   |NodeSet| is length(Result). The transfer stops   if  all members of
  739%   the set Quorum responded or the configured timeout passed.
  740%
  741%   @tbd If we get a `nack` we can stop
  742
  743collect(Quorum, Stop, Node, Template, Message, Result, NodeSet) :-
  744    State = state(0),
  745    L0 = [dummy|_],
  746    Answers = list(L0),
  747    (   broadcast_request(Message),
  748        (   Stop
  749        ->  !,
  750            fail
  751        ;   true
  752        ),
  753        duplicate_term(Template, Copy),
  754        NewLastCell = [Copy|_],
  755        arg(1, Answers, LastCell),
  756        nb_linkarg(2, LastCell, NewLastCell),
  757        nb_linkarg(1, Answers, NewLastCell),
  758        arg(1, State, Replied0),
  759        Replied is Replied0 \/ (1<<Node),
  760        nb_setarg(1, State, Replied),
  761        Quorum /\ Replied =:= Quorum
  762    ->  true
  763    ;   true
  764    ),
  765    arg(1, State, NodeSet),
  766    arg(1, Answers, [_]),               % close the answer list
  767    L0 = [_|Result].
  768
  769%!  paxos_get(?Term) is semidet.
  770%
  771%   Equivalent to paxos_key(Term,Key), pasox_get(Key,Term).   I.e., Term
  772%   is a compound term and its key  is the name/arity pair. This version
  773%   provides compatibility with older versions of this library.
  774
  775%!  paxos_get(+Key, +Value) is semidet.
  776%!  paxos_get(+Key, +Value, +Options) is semidet.
  777%
  778%   unifies Term with the entry retrieved from the Paxon's ledger. If no
  779%   such entry exists in the member's local   cache,  then the quorum is
  780%   asked to provide a value,  which   is  verified  for consistency. An
  781%   implied paxos_set/1 follows. This predicate  succeeds if a term
  782%   with the same functor and arity exists   in  the Paxon's ledger, and
  783%   fails otherwise.
  784%
  785%   Options processed:
  786%
  787%     - retry(Retries)
  788%     is a non-negative integer specifying the number of retries that
  789%     will be performed before a set is abandoned.  Defaults to the
  790%     _setting_ `max_gets` (5).
  791%     - timeout(+Seconds)
  792%     Max time to wait for the forum to reply.  Defaults to the
  793%     _setting_ `response_timeout` (0.020, 20ms).
  794%
  795%   @arg Term is a compound. Any unbound variables are unified with
  796%   those provided in the ledger entry.
  797
  798paxos_get(Term) :-
  799    paxos_key(Term, Key),
  800    paxos_get(Key, Term, []).
  801paxos_get(Key, Value) :-
  802    paxos_get(Key, Value, []).
  803
  804paxos_get(Key, Value, _) :-
  805    ledger(Key, _Line, Value),
  806    !.
  807paxos_get(Key, Value, Options) :-
  808    paxos_initialize,
  809    option(retry(Retries), Options, Retries),
  810    option(timeout(TMO), Options, TMO),
  811    apply_default(Retries, max_gets),
  812    apply_default(TMO, response_timeout),
  813    Msg = Line-Value,
  814    paxos_message(retrieve(Key,Nr,Line,Value), TMO, Retrieve),
  815    node(Node),
  816    between(0, Retries, _),
  817      life_quorum(Quorum, Alive),
  818      QuorumA is Alive /\ \(1<<Node),
  819      collect(QuorumA, false, Nr, Msg, Retrieve, Terms, RetrievedNodes),
  820      debug(paxos, 'Retrieved: ~p from 0x~16r', [Terms, RetrievedNodes]),
  821      highest_vote(Terms, _Line-MajorityValue, Count),
  822      debug(paxos, 'Best: ~p with ~d votes', [MajorityValue, Count]),
  823      Count >= (popcount(QuorumA)+2)//2,
  824      debug(paxos, 'Retrieve: accept ~p', [MajorityValue]),
  825      update_failed(get, Quorum, RetrievedNodes),
  826      paxos_set(Key, MajorityValue),    % Is this needed?
  827    !.
  828
  829highest_vote(Terms, Term, Count) :-
  830    msort(Terms, Sorted),
  831    count_votes(Sorted, Counted),
  832    sort(1, >, Counted, [Count-Term|_]).
  833
  834count_votes([], []).
  835count_votes([H|T0], [N-H|T]) :-
  836    count_same(H, T0, 1, N, R),
  837    count_votes(R, T).
  838
  839count_same(H, [Hc|T0], C0, C, R) :-
  840    H == Hc,
  841    !,
  842    C1 is C0+1,
  843    count_same(H, T0, C1, C, R).
  844count_same(_, R, C, C, R).
  845
  846%!  paxos_key(+Term, -Key) is det.
  847%
  848%   Compatibility to allow for paxos_get/1, paxos_set/1, etc. The key of
  849%   a compound term is a term `'$c'(Name,Arity)`.   Note  that we do not
  850%   use `Name/Arity` and `X/Y` is  naturally   used  to organize keys as
  851%   hierachical _paths_.
  852
  853paxos_key(Compound, '$c'(Name,Arity)) :-
  854    compound(Compound), !,
  855    compound_name_arity(Compound, Name, Arity).
  856paxos_key(Compound, _) :-
  857    must_be(compound, Compound).
  858
  859
  860		 /*******************************
  861		 *          REPLICATION		*
  862		 *******************************/
  863
  864%!  start_replicator
  865%
  866%   Start or signal the replicator thread  that there may be outstanding
  867%   replication work.  This is the case if
  868%
  869%     - The union of _quorum_ and _learners_ was extended, and thus
  870%       all data may need to be replicated to the new members.
  871%     - A paxos_set/3 was not fully acknowledged.
  872
  873start_replicator :-
  874    catch(thread_send_message(paxos_replicator, run),
  875          error(existence_error(_,_),_),
  876          fail),
  877    !.
  878start_replicator :-
  879    catch(thread_create(replicator, _,
  880                        [ alias(paxos_replicator),
  881                          detached(true)
  882                        ]),
  883          error(permission_error(_,_,_),_),
  884          true).
  885
  886replicator :-
  887    setting(replication_rate, ReplRate),
  888    ReplSleep is 1/ReplRate,
  889    node(Node),
  890    debug(paxos(replicate), 'Starting replicator', []),
  891    State = state(idle),
  892    repeat,
  893      quorum(Quorum),
  894      dead(Dead),
  895      LifeQuorum is Quorum /\ \Dead,
  896      (   LifeQuorum /\ \(1<<Node) =:= 0
  897      ->  debug(paxos(replicate),
  898                'Me: ~d, Quorum: 0x~16r, Dead: 0x~16r: I''m alone, waiting ...',
  899                [Node, Quorum, Dead]),
  900          thread_get_message(_)
  901      ;   (   paxos_replicate_key(LifeQuorum, Key, [])
  902          ->  replicated(State, key(Key)),
  903              thread_self(Me),
  904              thread_get_message(Me, _, [timeout(ReplSleep)])
  905          ;   replicated(State, idle),
  906              thread_get_message(_)
  907          )
  908      ),
  909      fail.
  910
  911replicated(State, key(_Key)) :-
  912    arg(1, State, idle),
  913    !,
  914    debug(paxos(replicate), 'Start replicating ...', []),
  915    nb_setarg(1, State, 1).
  916replicated(State, key(_Key)) :-
  917    !,
  918    arg(1, State, C0),
  919    C is C0+1,
  920    nb_setarg(1, State, C).
  921replicated(State, idle) :-
  922    arg(1, State, idle),
  923    !.
  924replicated(State, idle) :-
  925    arg(1, State, Count),
  926    debug(paxos(replicate), 'Replicated ~D keys', [Count]),
  927    nb_setarg(1, State, idle).
  928
  929
  930%!  paxos_replicate_key(+Nodes:bitmap, ?Key, +Options) is det.
  931%
  932%   Replicate a Key to Nodes.  If Key is unbound, a random key is
  933%   selected.
  934%
  935%     - timeout(+Seconds)
  936%     Max time to wait for the forum to reply.  Defaults to the
  937%     _setting_ `response_timeout` (0.020, 20ms).
  938
  939paxos_replicate_key(Nodes, Key, Options) :-
  940    replication_key(Nodes, Key),
  941    option(timeout(TMO), Options, TMO),
  942    apply_default(TMO, response_timeout),
  943    ledger_current(Key, Gen, Value, Holders),
  944    paxos_message(learn(Key,Na,Gen,Ga,Value), TMO, Learn),
  945    collect(Nodes, Ga == nack, Na, Ga, Learn, _Gas, LearnedNodes),
  946    NewHolders is Holders \/ LearnedNodes,
  947    paxos_message(learned(Key,Gen,Value,NewHolders), -, Learned),
  948    broadcast(Learned),
  949    update_failed(replicate, Nodes, LearnedNodes).
  950
  951replication_key(_Nodes, Key) :-
  952    ground(Key),
  953    !.
  954replication_key(Nodes, Key) :-
  955    (   Nth is 1+random(popcount(Nodes))
  956    ;   Nth = 1
  957    ),
  958    call_nth(needs_replicate(Nodes, Key), Nth),
  959    !.
  960
  961needs_replicate(Nodes, Key) :-
  962    ledger_current(Key, _Gen, _Value, Holders),
  963    Nodes /\ \Holders =\= 0,
  964    \+ admin_key(_, Key).
  965
  966
  967		 /*******************************
  968		 *      KEY CHANGE EVENTS	*
  969		 *******************************/
  970
  971%!  paxos_on_change(?Term, :Goal) is det.
  972%!  paxos_on_change(?Key, ?Value, :Goal) is det.
  973%
  974%   executes the specified Goal  when   Key  changes.  paxos_on_change/2
  975%   listens for paxos(changed(Key,Value) notifications   for  Key, which
  976%   are emitted as the result   of  successful paxos_set/3 transactions.
  977%   When one is received for Key, then   Goal  is executed in a separate
  978%   thread of execution.
  979%
  980%   @arg Term is a compound, identical to that used for
  981%   paxos_get/1.
  982%   @arg Goal is one of:
  983%     - a callable atom or term, or
  984%     - the atom =ignore=, which causes monitoring for Term to be
  985%       discontinued.
  986
  987paxos_on_change(Term, Goal) :-
  988    paxos_key(Term, Key),
  989    paxos_on_change(Key, Term, Goal).
  990
  991paxos_on_change(Key, Value, Goal) :-
  992    Goal = _:Plain,
  993    must_be(callable, Plain),
  994    (   Plain == ignore
  995    ->  unlisten(paxos_user, paxos_changed(Key,Value))
  996    ;   listen(paxos_user, paxos_changed(Key,Value),
  997               key_changed(Key, Value, Goal)),
  998        paxos_initialize
  999    ).
 1000
 1001key_changed(_Key, _Value, Goal) :-
 1002    E = error(_,_),
 1003    catch(thread_create(Goal, _, [detached(true)]),
 1004          E, key_error(E)).
 1005
 1006key_error(error(permission_error(create, thread, _), _)) :-
 1007    !.
 1008key_error(E) :-
 1009    print_message(error, E).
 1010
 1011
 1012		 /*******************************
 1013		 *            HOOKS		*
 1014		 *******************************/
 1015
 1016%!  node(-Node) is det.
 1017%
 1018%   Get the node ID for this paxos node.
 1019
 1020%!  quorum(-Quorum) is det.
 1021%
 1022%   Get the current quorum as a bitmask
 1023
 1024%!  paxos_message(+PaxOS, +TimeOut, -BroadcastMessage) is det.
 1025%
 1026%   Transform a basic PaxOS message in   a  message for the broadcasting
 1027%   service. This predicate is hooked   by paxos_message_hook/3 with the
 1028%   same signature.
 1029%
 1030%   @arg TimeOut is one of `-` or a time in seconds.
 1031
 1032paxos_message(Paxos:From, TMO, Message) :-
 1033    paxos_message_hook(paxos(Paxos):From, TMO, Message),
 1034    !.
 1035paxos_message(Paxos, TMO, Message) :-
 1036    paxos_message_hook(paxos(Paxos), TMO, Message),
 1037    !.
 1038paxos_message(Paxos, TMO, Message) :-
 1039    throw(error(mode_error(det, fail,
 1040                           paxos:paxos_message_hook(Paxos, TMO, Message)), _)).
 1041
 1042
 1043		 /*******************************
 1044		 *           STORAGE		*
 1045		 *******************************/
 1046
 1047:- dynamic
 1048    paxons_ledger/4.                    % Key, Gen, Value, Holders
 1049
 1050%!  ledger_current(?Key, ?Gen, ?Value, ?Holders) is nondet.
 1051%
 1052%   True when Key is a known key in my ledger.
 1053
 1054ledger_current(Key, Gen, Value, Holders) :-
 1055    paxons_ledger(Key, Gen, Value, Holders),
 1056    valid(Holders).
 1057
 1058
 1059%!  ledger(+Key, -Gen, -Value) is semidet.
 1060%
 1061%   True if the ledger has Value associated  with Key at generation Gen.
 1062%   Note that if the value is  not   yet  acknowledged  by the leader we
 1063%   should not use it.
 1064
 1065ledger(Key, Gen, Value) :-
 1066    paxons_ledger(Key, Gen, Value0, Holders),
 1067    valid(Holders),
 1068    !,
 1069    Value = Value0.
 1070
 1071%!  ledger_create(+Key, +Gen, +Value) is det.
 1072%
 1073%   Create a new Key-Value pair  at   generation  Gen.  This is executed
 1074%   during the preparation phase.
 1075
 1076ledger_create(Key, Gen, Value) :-
 1077    get_time(Now),
 1078    asserta(paxons_ledger(Key, Gen, Value, created(Now))).
 1079
 1080%!  ledger_update(+Key, +Gen, +Value) is semidet.
 1081%
 1082%   Update Key to Value if the  current   generation  is older than Gen.
 1083%   This reflects the accept phase of the protocol.
 1084
 1085ledger_update(Key, Gen, Value) :-
 1086    paxons_ledger(Key, Gen0, _Value, _Holders),
 1087    !,
 1088    Gen > Gen0,
 1089    get_time(Now),
 1090    asserta(paxons_ledger(Key, Gen, Value, accepted(Now))),
 1091    (   Gen0 == 0
 1092    ->  retractall(paxons_ledger(Key, Gen0, _, _))
 1093    ;   true
 1094    ).
 1095
 1096%!  ledger_update_holders(+Key, +Gen, +Holders) is det.
 1097%
 1098%   The leader acknowledged that Key@Gen represents a valid new
 1099
 1100ledger_update_holders(Key, Gen, Holders) :-
 1101    clause(paxons_ledger(Key, Gen, Value, Holders0), true, Ref),
 1102    !,
 1103    (   Holders0 == Holders
 1104    ->  true
 1105    ;   asserta(paxons_ledger(Key, Gen, Value, Holders)),
 1106        erase(Ref)
 1107    ),
 1108    clean_key(Holders0, Key, Gen).
 1109
 1110clean_key(Holders, _Key, _Gen) :-
 1111    valid(Holders),
 1112    !.
 1113clean_key(_, Key, Gen) :-
 1114    (   clause(paxons_ledger(Key, Gen0, _Value, _Holders0), true, Ref),
 1115        Gen0 < Gen,
 1116        erase(Ref),
 1117        fail
 1118    ;   true
 1119    ).
 1120
 1121
 1122%!  ledger_learn(+Key,+Gen,+Value) is semidet.
 1123%
 1124%   We received a learn event.
 1125
 1126ledger_learn(Key,Gen,Value) :-
 1127    paxons_ledger(Key, Gen0, Value0, _Holders),
 1128    !,
 1129    (   Gen == Gen0,
 1130        Value == Value0
 1131    ->  true
 1132    ;   Gen > Gen0
 1133    ->  get_time(Now),
 1134        asserta(paxons_ledger(Key, Gen, Value, learned(Now)))
 1135    ).
 1136ledger_learn(Key,Gen,Value) :-
 1137    get_time(Now),
 1138    asserta(paxons_ledger(Key, Gen, Value, learned(Now))).
 1139
 1140%!  ledger_forget(+Nodes) is det.
 1141%
 1142%   Remove Nodes from all ledgers.  This is executed in a background
 1143%   thread.
 1144
 1145ledger_forget(Nodes) :-
 1146    catch(thread_create(ledger_forget_threaded(Nodes), _,
 1147                        [ detached(true)
 1148                        ]),
 1149          error(permission_error(create, thread, _), _),
 1150          true).
 1151
 1152ledger_forget_threaded(Nodes) :-
 1153    debug(paxos(node), 'Forgetting 0x~16r', [Nodes]),
 1154    forall(ledger_current(Key, Gen, _Value, Holders),
 1155           ledger_forget(Nodes, Key, Gen, Holders)),
 1156    debug(paxos(node), 'Forgotten 0x~16r', [Nodes]).
 1157
 1158ledger_forget(Nodes, Key, Gen, Holders) :-
 1159    NewHolders is Holders /\ \Nodes,
 1160    (   NewHolders \== Holders,
 1161        ledger_update_holders(Key, Gen, NewHolders)
 1162    ->  true
 1163    ;   true
 1164    ).
 1165
 1166valid(Holders) :-
 1167    integer(Holders).
 1168
 1169
 1170		 /*******************************
 1171		 *             UTIL		*
 1172		 *******************************/
 1173
 1174%!  c_element(+NewList, +Old, -Value)
 1175%
 1176%   A Muller c-element is a logic block  used in asynchronous logic. Its
 1177%   output assumes the value of its  input   iff  all  of its inputs are
 1178%   identical. Otherwise, the output retains its original value.
 1179
 1180c_element([New | More], _Old, New) :-
 1181    forall(member(N, More), N == New),
 1182    !.
 1183c_element(_List, Old, Old).
 1184
 1185%!  arg_union(+Arg, +ListOfTerms, -Set) is det.
 1186%
 1187%   Get all the nth args from ListOfTerms  and   do  a  set union on the
 1188%   result.
 1189
 1190arg_union(Arg, NodeStatusList, Set) :-
 1191    maplist(arg(Arg), NodeStatusList, Sets),
 1192    list_union(Sets, Set).
 1193
 1194list_union(Sets, Set) :-
 1195    list_union(Sets, 0, Set).
 1196
 1197list_union([], Set, Set).
 1198list_union([H|T], Set0, Set) :-
 1199    Set1 is Set0 \/ H,
 1200    list_union(T, Set1, Set)