View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald, Jan Wielemaker
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2009-2018, Jeffrey Rosenwald
    7                   CWI, Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(paxos,
   37          [ paxos_get/1,                        % ?Term
   38            paxos_get/2,                        % +Key, -Value
   39            paxos_get/3,                        % +Key, -Value, +Options
   40            paxos_set/1,                        % ?Term
   41            paxos_set/2,                        % +Key, +Value
   42            paxos_set/3,                        % +Key, +Value, +Options
   43            paxos_on_change/2,                  % ?Term, +Goal
   44            paxos_on_change/3,                  % ?Key, ?Value, +Goal
   45
   46            paxos_initialize/1,			% +Options
   47                                                % Hook support
   48            paxos_replicate_key/3               % +Nodes, ?Key, +Options
   49          ]).   50:- use_module(library(broadcast)).   51:- use_module(library(debug)).   52:- use_module(library(lists)).   53:- use_module(library(settings)).   54:- use_module(library(option)).   55:- use_module(library(error)).   56:- use_module(library(apply)).   57:- use_module(library(solution_sequences)).

A Replicated Data Store

This module provides a replicated data store that is coordinated using a variation on Lamport's Paxos concensus protocol. The original method is described in his paper entitled, "The Part-time Parliament", which was published in 1998. The algorithm is tolerant of non-Byzantine failure. That is late or lost delivery or reply, but not senseless delivery or reply. The present algorithm takes advantage of the convenience offered by multicast to the quorum's membership, who can remain anonymous and who can come and go as they please without effecting Liveness or Safety properties.

Paxos' quorum is a set of one or more attentive members, whose processes respond to queries within some known time limit (< 20ms), which includes roundtrip delivery delay. This property is easy to satisfy given that every coordinator is necessarily a member of the quorum as well, and a quorum of one is permitted. An inattentive member (e.g. one whose actions are late or lost) is deemed to be "not-present" for the purposes of the present transaction and consistency cannot be assured for that member. As long as there is at least one attentive member of the quorum, then persistence of the database is assured.

Each member maintains a ledger of terms along with information about when they were originally recorded. The member's ledger is deterministic. That is to say that there can only be one entry per functor/arity combination. No member will accept a new term proposal that has a line number that is equal-to or lower-than the one that is already recorded in the ledger.

Paxos is a three-phase protocol:

1: A coordinator first prepares the quorum for a new proposal by broadcasting a proposed term. The quorum responds by returning the last known line number for that functor/arity combination that is recorded in their respective ledgers.
2: The coordinator selects the highest line number it receives, increments it by one, and then asks the quorum to finally accept the new term with the new line number. The quorum checks their respective ledgers once again and if there is still no other ledger entry for that functor/arity combination that is equal-to or higher than the specified line, then each member records the term in the ledger at the specified line. The member indicates consent by returning the specified line number back to the coordinator. If consent is withheld by a member, then the member returns a nack instead. The coordinator requires unanimous consent. If it isn't achieved then the proposal fails and the coordinator must start over from the beginning.
3: Finally, the coordinator concludes the successful negotiation by broadcasting the agreement to the quorum in the form of a paxos(changed(Key,Value) event. This is the only event that should be of interest to user programs.

For practical reasons, we rely on the partially synchronous behavior (e.g. limited upper time bound for replies) of broadcast_request/1 over TIPC to ensure Progress. Perhaps more importantly, we rely on the fact that the TIPC broadcast listener state machine guarantees the atomicity of broadcast_request/1 at the process level, thus obviating the need for external mutual exclusion mechanisms.

Note that this algorithm does not guarantee the rightness of the value proposed. It only guarantees that if successful, the value proposed is identical for all attentive members of the quorum.

author
- Jeffrey Rosenwald (JeffRose@acm.org)
See also
- tipc_broadcast.pl, udp_broadcast.pl */
license
- BSD-2
  129:- meta_predicate
  130    paxos_on_change(?, 0),
  131    paxos_on_change(?, ?, 0).  132
  133:- multifile
  134    paxos_message_hook/3.               % +PaxOS, +TimeOut, -Message
  135
  136:- setting(max_sets, nonneg, 20,
  137           "Max Retries to get to an agreement").  138:- setting(max_gets, nonneg, 5,
  139           "Max Retries to get a value from the forum").  140:- setting(response_timeout, float, 0.020,
  141           "Max time to wait for a response").  142:- setting(replication_rate, number, 1000,
  143           "Number of keys replicated per second").  144:- setting(death_half_life, number, 10,
  145           "Half-time for failure score").  146:- setting(death_score, number, 100,
  147           "Number of keys replicated per second").
 paxos_initialize(+Options) is det
Initialize this Prolog process as a paxos node. The initialization requires an initialized and configured TIPC, UDP or other broadcast protocol. Calling this initialization may be omitted, in which case the equivant of paxos_initialize([]) is executed lazily as part of the first paxos operation. Defined options:
node(?NodeID)
When instantiated, this node rejoins the network with the given node id. A fixed node idea should be used if the node is configured for persistency and causes the new node to receive updates for keys that have been created or modified since the node left the network. If NodeID is a variable it is unified with the discovered NodeID.

NodeID must be a small non-negative integer as these identifiers are used in bitmaps.

  169:- dynamic  paxos_initialized/0.  170:- volatile paxos_initialized/0.  171
  172paxos_initialize(_Options) :-
  173    paxos_initialized,
  174    !.
  175paxos_initialize(Options) :-
  176    with_mutex(paxos, paxos_initialize_sync(Options)).
  177
  178paxos_initialize_sync(_Options) :-
  179    paxos_initialized,
  180    !.
  181paxos_initialize_sync(Options) :-
  182    at_halt(paxos_leave),
  183    listen(paxos, paxos(X), paxos_message(X)),
  184    paxos_assign_node(Options),
  185    start_replicator,
  186    asserta(paxos_initialized).
  187
  188paxos_initialize :-
  189    paxos_initialize([]).
  190
  191
  192		 /*******************************
  193		 *            ADMIN		*
  194		 *******************************/
 paxos_get_admin(+Name, -Value) is semidet
 paxos_set_admin(+Name, +Value) is semidet
Set administrative keys. We use a wrapper such that we can hide the key identity.
  202admin_key(quorum, '$paxos_quorum').
  203admin_key(dead,  '$paxos_dead_nodes').
  204
  205paxos_get_admin(Name, Value) :-
  206    admin_key(Name, Key),
  207    paxos_get(Key, Value).
  208
  209paxos_set_admin(Name, Value) :-
  210    admin_key(Name, Key),
  211    paxos_set(Key, Value).
  212
  213paxos_set_admin_bg(Name, Value) :-
  214    thread_create(ignore(paxos_set_admin(Name, Value)), _,
  215                  [ detached(true)
  216                  ]).
  217
  218
  219		 /*******************************
  220		 *           NODE DATA		*
  221		 *******************************/
 node(?NodeId)
 quorum(?Bitmap)
 dead(?Bitmap)
 failed(?Bitmap)
 failed(?NodeId, ?LastTried, ?Score)
Track our identity as well as as the status of our peers in the network. NodeId is a small integer. Multiple NodeIds are combined in a Bitmap.
  241:- dynamic
  242    node/1,                             % NodeID
  243    quorum/1,                           % Bitmap
  244    failed/1,                           % Bitmap
  245    failed/3,                           % NodeID, LastTried, Score
  246    leaving/0,                          % Node is leaving
  247    dead/1,                             % Bitmap
  248    salt/1.                             % Unique key
  249:- volatile
  250    node/1,
  251    quorum/1,
  252    failed/1,
  253    failed/3,
  254    leaving/0,
  255    dead/1,
  256    salt/1.
 paxos_assign_node(+Options) is det
Assign a node for this paxos instance. If node is given as an option, this is the node id that is used. Otherwise the network is analysed and the system selects a new node.
  264paxos_assign_node(Options) :-
  265    (   option(node(Node), Options)
  266    ->  node(Node)
  267    ;   node(_)
  268    ),                                          % already done
  269    !.
  270paxos_assign_node(Options) :-
  271    between(1, 20, Retry),
  272    option(node(Node), Options, Node),
  273    (   node(_)
  274    ->  permission_error(set, paxos_node, Node)
  275    ;   true
  276    ),
  277    retractall(dead(_)),
  278    retractall(quorum(_)),
  279    retractall(failed(_)),
  280    retractall(failed(_,_,_)),
  281    retractall(leaving),
  282    Salt is random(1<<63),
  283    asserta(salt(Salt)),
  284    paxos_message(node(N,Q,D):From, 0.25, NodeQuery),
  285    findall(t(N,Q,D,From),
  286            broadcast_request(NodeQuery),
  287            Network),
  288    select(t(self,0,Salt,Me), Network, AllNodeStatus),
  289    partition(starting, AllNodeStatus, Starting, Running),
  290    nth_starting(Starting, Salt, Offset),
  291    retractall(salt(_)),
  292    debug(paxos(node), 'Me@~p; starting: ~p; running: ~p',
  293          [Me, Starting, Running]),
  294    arg_union(2, Running, Quorum),
  295    arg_union(3, Running, Dead),
  296    (   var(Node)
  297    ->  (   call_nth(( between(0, 1000, Node),
  298                       \+ memberchk(t(Node,_,_,_), Running),
  299                       Dead /\ (1<<Node) =:= 0),
  300                     Offset)
  301        ->  debug(paxos(node), 'Assigning myself node ~d', [Node])
  302        ;   resource_error(paxos_nodes)
  303        )
  304    ;   memberchk(t(Node,_,_,_), Running)
  305    ->  permission_error(set, paxos_node, Node)
  306    ;   Rejoin = true
  307    ),
  308    asserta(node(Node)),
  309    (   claim_node(Node, Me)
  310    ->  !,
  311        asserta(dead(Dead)),
  312        set_quorum(Node, Quorum),
  313        (   Rejoin == true
  314        ->  paxos_rejoin
  315        ;   true
  316        )
  317    ;   debug(paxos(node), 'Node already claimed; retrying (~p)', [Node, Retry]),
  318        retractall(node(Node)),
  319        fail
  320    ).
  321
  322starting(t(self,_Quorum,_Salt,_Address)).
  323
  324nth_starting(Starting, Salt, N) :-
  325    maplist(arg(3), Starting, Salts),
  326    sort([Salt|Salts], Sorted),
  327    nth1(N, Sorted, Salt),
  328    !.
  329
  330claim_node(Node, Me) :-
  331    paxos_message(claim_node(Node, Ok):From, 0.25, NodeQuery),
  332    forall((   broadcast_request(NodeQuery),
  333               From \== Me,
  334               debug(paxos(node), 'Claim ~p ~p: ~p', [Node, From, Ok])
  335           ),
  336           Ok == true).
  337
  338set_quorum(Node, Quorum0) :-
  339    Quorum is Quorum0 \/ (1<<Node),
  340    debug(paxos(node), 'Adding ~d to quorum (now 0x~16r)', [Node, Quorum]),
  341    asserta(quorum(Quorum)),
  342    paxos_set_admin(quorum, Quorum).
 paxos_rejoin
Re-join the network. Tasks:
  352paxos_rejoin :-
  353    node(Node),
  354    repeat,
  355        paxos_get_admin(dead, Dead0),
  356        Dead is Dead0 /\ \(1<<Node),
  357        (   Dead == Dead0
  358        ->  true
  359        ;   paxos_set_admin(dead, Dead)
  360        ),
  361    !.
 paxos_leave is det
 paxos_leave(+Node) is det
Leave the network. The predicate paxos_leave/0 is called from at_halt/1 to ensure the node is deleted as the process dies. The paxos_leave/1 version is called to discard other nodes if they repeatedly did not respond to queries.
  371paxos_leave :-
  372    node(Node),
  373    !,
  374    asserta(leaving),
  375    paxos_leave(Node),
  376    Set is 1<<Node,
  377    paxos_message(forget(Set), -, Forget),
  378    broadcast(Forget),
  379    unlisten(paxos),
  380    retractall(leaving).
  381paxos_leave.
  382
  383paxos_leave(Node) :-
  384    !,
  385    paxos_update_set(quorum, del(Node)),
  386    paxos_update_set(dead,   add(Node)).
  387paxos_leave(_).
  388
  389paxos_update_set(Set, How) :-
  390    repeat,
  391      Term =.. [Set,Value],
  392      call(Term),
  393      (   How = add(Node)
  394      ->  NewValue is Value \/  (1<<Node)
  395      ;   How = del(Node)
  396      ->  NewValue is Value /\ \(1<<Node)
  397      ),
  398      (   Value == NewValue
  399      ->  true
  400      ;   paxos_set_admin(Set, NewValue)
  401      ),
  402    !.
  403
  404		 /*******************************
  405		 *          NODE STATUS		*
  406		 *******************************/
 update_failed(+Action, +Quorum, +Alive) is det
We just sent the Quorum a message and got a reply from the set Alive.
Arguments:
is- one of set, get or replicate and indicates the intended action.
  416update_failed(Action, Quorum, Alive) :-
  417    Failed is Quorum /\ \Alive,
  418    alive(Alive),
  419    consider_dead(Failed),
  420    (   failed(Failed)
  421    ->  true
  422    ;   (   clause(failed(_Old), true, Ref)
  423        ->  asserta(failed(Failed)),
  424            erase(Ref),
  425            debug(paxos(node), 'Updated failed quorum to 0x~16r', [Failed])
  426        ;   asserta(failed(Failed))
  427        ),
  428        (   Action == set
  429        ->  start_replicator
  430        ;   true
  431        )
  432    ).
  433
  434consider_dead(0) :-
  435    !.
  436consider_dead(Failed) :-
  437    Node is lsb(Failed),
  438    consider_dead1(Node),
  439    Rest is Failed /\ \(1<<Node),
  440    consider_dead(Rest).
  441
  442consider_dead1(Node) :-
  443    clause(failed(Node, Last, Score), true, Ref),
  444    !,
  445    setting(death_half_life, HalfLife),
  446    setting(death_score, DeathScore),
  447    get_time(Now),
  448    Passed is Now-Last,
  449    NewScore is Score*(2**(-Passed/HalfLife)) + 10,
  450    asserta(failed(Node, Now, NewScore)),
  451    erase(Ref),
  452    (   NewScore < DeathScore
  453    ->  debug(paxos(node), 'Consider node ~d dead', [Node]),
  454        paxos_leave(Node)
  455    ;   true
  456    ).
  457consider_dead1(Node) :-
  458    get_time(Now),
  459    asserta(failed(Node, Now, 10)).
  460
  461alive(Bitmap) :-
  462    (   clause(failed(Node, _Last, _Score), true, Ref),
  463        Bitmap /\ (1<<Node) =\= 0,
  464        erase(Ref),
  465        fail
  466    ;   true
  467    ).
 life_quorum(-Quorum, -LifeQuorum) is det
Find the Quorum and the living nodes from the Quorum. This is the set for which we wait. If the LifeQuorum is not a majority we address the whole Quorum.
To be done
- At some point in time we must remove a node from the quorum.
  478life_quorum(Quorum, LifeQuorum) :-
  479    quorum(Quorum),
  480    (   failed(Failed),
  481        Failed \== 0,
  482        LifeQuorum is Quorum /\ \Failed,
  483        majority(LifeQuorum, Quorum)
  484    ->  true
  485    ;   LifeQuorum = Quorum
  486    ).
  487
  488
  489		 /*******************************
  490		 *        NETWORK STATUS	*
  491		 *******************************/
  492
  493:- admin_key(quorum, Key),
  494   listen(paxos_changed(Key, Quorum),
  495          update_quorum(Quorum)).  496:- admin_key(dead, Key),
  497   listen(paxos_changed(Key, Death),
  498          update_dead(Death)).  499
  500update_quorum(Proposed) :-
  501    debug(paxos(node), 'Received quorum proposal 0x~16r', [Proposed]),
  502    quorum(Proposed),
  503    !.
  504update_quorum(Proposed) :-
  505    leaving,
  506    !,
  507    update(quorum(Proposed)).
  508update_quorum(Proposed) :-
  509    node(Node),
  510    Proposed /\ (1<<Node) =\= 0,
  511    !,
  512    update(quorum(Proposed)).
  513update_quorum(Proposed) :-
  514    node(Node),
  515    NewQuorum is Proposed \/ (1<<Node),
  516    update(quorum(NewQuorum)),
  517    debug(paxos(node), 'I''m not in the quorum! Proposing 0x~16r', [NewQuorum]),
  518    paxos_set_admin_bg(quorum, NewQuorum).
  519
  520update_dead(Proposed) :-
  521    debug(paxos(node), 'Received dead proposal 0x~16r', [Proposed]),
  522    dead(Proposed),
  523    !.
  524update_dead(Proposed) :-
  525    leaving,
  526    !,
  527    update(dead(Proposed)).
  528update_dead(Proposed) :-
  529    node(Node),
  530    Proposed /\ (1<<Node) =:= 0,
  531    !,
  532    update(dead(Proposed)).
  533update_dead(Proposed) :-
  534    node(Node),
  535    NewDead is Proposed /\ \(1<<Node),
  536    update(dead(NewDead)),
  537    paxos_set_admin_bg(dead, NewDead).
  538
  539update(Clause) :-
  540    functor(Clause, Name, Arity),
  541    functor(Generic, Name, Arity),
  542    (   clause(Generic, true, Ref)
  543    ->  asserta(Clause),
  544        erase(Ref)
  545    ;   asserta(Clause)
  546    ).
  547
  548		 /*******************************
  549		 *         INBOUND EVENTS	*
  550		 *******************************/
 paxos_message(?Message)
Handle inbound actions from our peers. Defines values for Message are:
prepare(+Key, -Node, -Gen, +Value)
A request message to set Key to Value. Returns the current generation at which we have a value or 0 for Gen and the our node id for Node.
accept(+Key, -Node, +Gen, -GenA, +Value)
A request message to set Key to Value if Gen is newer than the generation we have for Key. In that case GenA is Gen. Otherwise we reject using GenA = nack.
changed(+Key, +Gen, +Value, +Acceptors)
The leader got enough accepts for setting Key to Value at Gen. Acceptors is the set of nodes that accepted this value.
learn(+Key, -Node, +Gen, -GenA, +Value)
Request message peforming phase one for replication to learner nodes.
learned(+Key, +Gen, +Value, +Acceptors)
Phase two of the replication. Confirm the newly learned knowledge.
retrieve(+Key, -Node, -Gen, -Value)
A request message to retrieve our value for Key. Also provides our node id and the generation.
forget(+Nodes)
Forget the existence of Nodes.
node(-Node, -Quorum, -Dead)
Get my view about the network. Node is the (integer) node id of this node, Quorum is the idea of the quorum and Dead is the idea about non-responsive nodes.
To be done
- : originally the changed was handled by a get and when not successful with a new set, named paxos_audit. I don't really see why we need this.
  587paxos_message(prepare(Key,Node,Gen,Value)) :-
  588    node(Node),
  589    (   ledger(Key, Gen, _)
  590    ->  true
  591    ;   Gen = 0,
  592        ledger_create(Key, Gen, Value)
  593    ),
  594    debug(paxos, 'Prepared ~p-~p@~d', [Key,Value,Gen]).
  595paxos_message(accept(Key,Node,Gen,GenA,Value)) :-
  596    node(Node),
  597    (   ledger_update(Key, Gen, Value)
  598    ->  debug(paxos, 'Accepted ~p-~p@~d', [Key,Value,Gen]),
  599        GenA = Gen
  600    ;   debug(paxos, 'Rejected ~p-~p@~d', [Key,Value,Gen]),
  601        GenA = nack
  602    ).
  603paxos_message(changed(Key,Gen,Value,Acceptors)) :-
  604    debug(paxos, 'Changed ~p-~p@~d for ~p', [Key, Value, Gen, Acceptors]),
  605    ledger_update_holders(Key,Gen,Acceptors),
  606    broadcast(paxos_changed(Key,Value)).
  607paxos_message(learn(Key,Node,Gen,GenA,Value)) :-
  608    node(Node),
  609    debug(paxos, 'Learn ~p-~p@~p?', [Key, Value, Gen]),
  610    (   ledger_learn(Key,Gen,Value)
  611    ->  debug(paxos, 'Learned ~p-~p@~d', [Key,Value,Gen]),
  612        GenA = Gen
  613    ;   debug(paxos, 'Rejected ~p@~d', [Key, Gen]),
  614        GenA = nack
  615    ).
  616paxos_message(learned(Key,Gen,_Value,Acceptors)) :-
  617    ledger_update_holders(Key,Gen,Acceptors).
  618paxos_message(retrieve(Key,Node,K,Value)) :-
  619    node(Node),
  620    debug(paxos, 'Retrieving ~p', [Key]),
  621    ledger(Key,K,Value),
  622    debug(paxos, 'Retrieved ~p-~p@~d', [Key,Value,K]),
  623    !.
  624paxos_message(forget(Nodes)) :-
  625    ledger_forget(Nodes).
  626paxos_message(node(Node,Quorum,Dead)) :-
  627    (   node(Node),
  628        quorum(Quorum),
  629        dead(Dead)
  630    ->  true
  631    ;   salt(Salt),
  632        Node = self,
  633        Quorum = 0,
  634        Dead = Salt
  635    ).
  636paxos_message(claim_node(Node, Ok)) :-
  637    (   node(Node)
  638    ->  Ok = false
  639    ;   Ok = true
  640    ).
  641
  642
  643		 /*******************************
  644		 *     KEY-VALUE OPERATIONS	*
  645		 *******************************/
 paxos_set(+Term) is semidet
Equivalent to paxos_key(Term,Key), pasox_set(Key,Term). I.e., Term is a ground compound term and its key is the name/arity pair. This version provides compatibility with older versions of this library.
 paxos_set(+Key, +Value) is semidet
 paxos_set(+Key, +Value, +Options) is semidet
negotiates to have Key-Value recorded in the ledger for each of the quorum's members. This predicate succeeds if the quorum unanimously accepts the proposed term. If no such entry exists in the Paxon's ledger, then one is silently created. paxos_set/1 will retry the transaction several times (default: 20) before failing. Failure is rare and is usually the result of a collision of two or more writers writing to the same term at precisely the same time. On failure, it may be useful to wait some random period of time, and then retry the transaction. By specifying a retry count of zero, paxos_set/2 will succeed iff the first ballot succeeds.

On success, paxos_set/1 will also broadcast the term paxos(changed(Key,Value), to the quorum.

Options processed:

retry(Retries)
is a non-negative integer specifying the number of retries that will be performed before a set is abandoned. Defaults to the setting max_sets (20).
timeout(+Seconds)
Max time to wait for the forum to reply. Defaults to the setting response_timeout (0.020, 20ms).
Arguments:
Term- is a compound that may have unbound variables.
To be done
- If the Value is already current, should we simply do nothing?
  683paxos_set(Term) :-
  684    paxos_key(Term, Key),
  685    paxos_set(Key, Term, []).
  686
  687paxos_set(Key, Value) :-
  688    paxos_set(Key, Value, []).
  689
  690paxos_set(Key, Value, Options) :-
  691    must_be(ground, Key-Value),
  692    paxos_initialize,
  693    option(retry(Retries), Options, Retries),
  694    option(timeout(TMO), Options, TMO),
  695    apply_default(Retries, max_sets),
  696    apply_default(TMO, response_timeout),
  697    paxos_message(prepare(Key,Np,Rp,Value), TMO, Prepare),
  698    between(0, Retries, _),
  699      life_quorum(Quorum, Alive),
  700      debug(paxos, 'Set: ~p -> ~p', [Key, Value]),
  701      collect(Quorum, false, Np, Rp, Prepare, Rps, PrepNodes),
  702      debug(paxos, 'Set: quorum: 0x~16r, prepared by 0x~16r, gens ~p',
  703            [Quorum, PrepNodes, Rps]),
  704      majority(PrepNodes, Quorum),
  705      max_list(Rps, K),
  706      succ(K, K1),
  707      paxos_message(accept(Key,Na,K1,Ra,Value), TMO, Accept),
  708      collect(Alive, Ra == nack, Na, Ra, Accept, Ras, AcceptNodes),
  709      majority(AcceptNodes, Quorum),
  710      intersecting(PrepNodes, AcceptNodes),
  711      c_element(Ras, K, K1),
  712      broadcast(paxos(log(Key,Value,AcceptNodes,K1))),
  713      paxos_message(changed(Key,K1,Value,AcceptNodes), -, Changed),
  714      broadcast(Changed),
  715      update_failed(set, Quorum, AcceptNodes),
  716    !.
  717
  718apply_default(Var, Setting) :-
  719    var(Var),
  720    !,
  721    setting(Setting, Var).
  722apply_default(_, _).
  723
  724majority(SubSet, Set) :-
  725    popcount(SubSet) >= (popcount(Set)+2)//2.
  726
  727intersecting(Set1, Set2) :-
  728    Set1 /\ Set2 =\= 0.
 collect(+Quorum, :Stop, ?Node, ?Template, ?Message, -Result, -NodeSet) is semidet
Perform a broadcast request using Message. Node and Template share with Message and extract the replying node and the result value from Message. Result is the list of instantiations for Template received and NodeSet is the set (bitmask) of Node values that replies, i.e. |NodeSet| is length(Result). The transfer stops if all members of the set Quorum responded or the configured timeout passed.
To be done
- If we get a nack we can stop
  743collect(Quorum, Stop, Node, Template, Message, Result, NodeSet) :-
  744    State = state(0),
  745    L0 = [dummy|_],
  746    Answers = list(L0),
  747    (   broadcast_request(Message),
  748        (   Stop
  749        ->  !,
  750            fail
  751        ;   true
  752        ),
  753        duplicate_term(Template, Copy),
  754        NewLastCell = [Copy|_],
  755        arg(1, Answers, LastCell),
  756        nb_linkarg(2, LastCell, NewLastCell),
  757        nb_linkarg(1, Answers, NewLastCell),
  758        arg(1, State, Replied0),
  759        Replied is Replied0 \/ (1<<Node),
  760        nb_setarg(1, State, Replied),
  761        Quorum /\ Replied =:= Quorum
  762    ->  true
  763    ;   true
  764    ),
  765    arg(1, State, NodeSet),
  766    arg(1, Answers, [_]),               % close the answer list
  767    L0 = [_|Result].
 paxos_get(?Term) is semidet
Equivalent to paxos_key(Term,Key), pasox_get(Key,Term). I.e., Term is a compound term and its key is the name/arity pair. This version provides compatibility with older versions of this library.
 paxos_get(+Key, +Value) is semidet
 paxos_get(+Key, +Value, +Options) is semidet
unifies Term with the entry retrieved from the Paxon's ledger. If no such entry exists in the member's local cache, then the quorum is asked to provide a value, which is verified for consistency. An implied paxos_set/1 follows. This predicate succeeds if a term with the same functor and arity exists in the Paxon's ledger, and fails otherwise.

Options processed:

retry(Retries)
is a non-negative integer specifying the number of retries that will be performed before a set is abandoned. Defaults to the setting max_gets (5).
timeout(+Seconds)
Max time to wait for the forum to reply. Defaults to the setting response_timeout (0.020, 20ms).
Arguments:
Term- is a compound. Any unbound variables are unified with those provided in the ledger entry.
  798paxos_get(Term) :-
  799    paxos_key(Term, Key),
  800    paxos_get(Key, Term, []).
  801paxos_get(Key, Value) :-
  802    paxos_get(Key, Value, []).
  803
  804paxos_get(Key, Value, _) :-
  805    ledger(Key, _Line, Value),
  806    !.
  807paxos_get(Key, Value, Options) :-
  808    paxos_initialize,
  809    option(retry(Retries), Options, Retries),
  810    option(timeout(TMO), Options, TMO),
  811    apply_default(Retries, max_gets),
  812    apply_default(TMO, response_timeout),
  813    Msg = Line-Value,
  814    paxos_message(retrieve(Key,Nr,Line,Value), TMO, Retrieve),
  815    node(Node),
  816    between(0, Retries, _),
  817      life_quorum(Quorum, Alive),
  818      QuorumA is Alive /\ \(1<<Node),
  819      collect(QuorumA, false, Nr, Msg, Retrieve, Terms, RetrievedNodes),
  820      debug(paxos, 'Retrieved: ~p from 0x~16r', [Terms, RetrievedNodes]),
  821      highest_vote(Terms, _Line-MajorityValue, Count),
  822      debug(paxos, 'Best: ~p with ~d votes', [MajorityValue, Count]),
  823      Count >= (popcount(QuorumA)+2)//2,
  824      debug(paxos, 'Retrieve: accept ~p', [MajorityValue]),
  825      update_failed(get, Quorum, RetrievedNodes),
  826      paxos_set(Key, MajorityValue),    % Is this needed?
  827    !.
  828
  829highest_vote(Terms, Term, Count) :-
  830    msort(Terms, Sorted),
  831    count_votes(Sorted, Counted),
  832    sort(1, >, Counted, [Count-Term|_]).
  833
  834count_votes([], []).
  835count_votes([H|T0], [N-H|T]) :-
  836    count_same(H, T0, 1, N, R),
  837    count_votes(R, T).
  838
  839count_same(H, [Hc|T0], C0, C, R) :-
  840    H == Hc,
  841    !,
  842    C1 is C0+1,
  843    count_same(H, T0, C1, C, R).
  844count_same(_, R, C, C, R).
 paxos_key(+Term, -Key) is det
Compatibility to allow for paxos_get/1, paxos_set/1, etc. The key of a compound term is a term '$c'(Name,Arity). Note that we do not use Name/Arity and X/Y is naturally used to organize keys as hierachical paths.
  853paxos_key(Compound, '$c'(Name,Arity)) :-
  854    compound(Compound), !,
  855    compound_name_arity(Compound, Name, Arity).
  856paxos_key(Compound, _) :-
  857    must_be(compound, Compound).
  858
  859
  860		 /*******************************
  861		 *          REPLICATION		*
  862		 *******************************/
 start_replicator
Start or signal the replicator thread that there may be outstanding replication work. This is the case if
  873start_replicator :-
  874    catch(thread_send_message(paxos_replicator, run),
  875          error(existence_error(_,_),_),
  876          fail),
  877    !.
  878start_replicator :-
  879    catch(thread_create(replicator, _,
  880                        [ alias(paxos_replicator),
  881                          detached(true)
  882                        ]),
  883          error(permission_error(_,_,_),_),
  884          true).
  885
  886replicator :-
  887    setting(replication_rate, ReplRate),
  888    ReplSleep is 1/ReplRate,
  889    node(Node),
  890    debug(paxos(replicate), 'Starting replicator', []),
  891    State = state(idle),
  892    repeat,
  893      quorum(Quorum),
  894      dead(Dead),
  895      LifeQuorum is Quorum /\ \Dead,
  896      (   LifeQuorum /\ \(1<<Node) =:= 0
  897      ->  debug(paxos(replicate),
  898                'Me: ~d, Quorum: 0x~16r, Dead: 0x~16r: I''m alone, waiting ...',
  899                [Node, Quorum, Dead]),
  900          thread_get_message(_)
  901      ;   (   paxos_replicate_key(LifeQuorum, Key, [])
  902          ->  replicated(State, key(Key)),
  903              thread_self(Me),
  904              thread_get_message(Me, _, [timeout(ReplSleep)])
  905          ;   replicated(State, idle),
  906              thread_get_message(_)
  907          )
  908      ),
  909      fail.
  910
  911replicated(State, key(_Key)) :-
  912    arg(1, State, idle),
  913    !,
  914    debug(paxos(replicate), 'Start replicating ...', []),
  915    nb_setarg(1, State, 1).
  916replicated(State, key(_Key)) :-
  917    !,
  918    arg(1, State, C0),
  919    C is C0+1,
  920    nb_setarg(1, State, C).
  921replicated(State, idle) :-
  922    arg(1, State, idle),
  923    !.
  924replicated(State, idle) :-
  925    arg(1, State, Count),
  926    debug(paxos(replicate), 'Replicated ~D keys', [Count]),
  927    nb_setarg(1, State, idle).
 paxos_replicate_key(+Nodes:bitmap, ?Key, +Options) is det
Replicate a Key to Nodes. If Key is unbound, a random key is selected.
timeout(+Seconds)
Max time to wait for the forum to reply. Defaults to the setting response_timeout (0.020, 20ms).
  939paxos_replicate_key(Nodes, Key, Options) :-
  940    replication_key(Nodes, Key),
  941    option(timeout(TMO), Options, TMO),
  942    apply_default(TMO, response_timeout),
  943    ledger_current(Key, Gen, Value, Holders),
  944    paxos_message(learn(Key,Na,Gen,Ga,Value), TMO, Learn),
  945    collect(Nodes, Ga == nack, Na, Ga, Learn, _Gas, LearnedNodes),
  946    NewHolders is Holders \/ LearnedNodes,
  947    paxos_message(learned(Key,Gen,Value,NewHolders), -, Learned),
  948    broadcast(Learned),
  949    update_failed(replicate, Nodes, LearnedNodes).
  950
  951replication_key(_Nodes, Key) :-
  952    ground(Key),
  953    !.
  954replication_key(Nodes, Key) :-
  955    (   Nth is 1+random(popcount(Nodes))
  956    ;   Nth = 1
  957    ),
  958    call_nth(needs_replicate(Nodes, Key), Nth),
  959    !.
  960
  961needs_replicate(Nodes, Key) :-
  962    ledger_current(Key, _Gen, _Value, Holders),
  963    Nodes /\ \Holders =\= 0,
  964    \+ admin_key(_, Key).
  965
  966
  967		 /*******************************
  968		 *      KEY CHANGE EVENTS	*
  969		 *******************************/
 paxos_on_change(?Term, :Goal) is det
 paxos_on_change(?Key, ?Value, :Goal) is det
executes the specified Goal when Key changes. paxos_on_change/2 listens for paxos(changed(Key,Value) notifications for Key, which are emitted as the result of successful paxos_set/3 transactions. When one is received for Key, then Goal is executed in a separate thread of execution.
Arguments:
Term- is a compound, identical to that used for paxos_get/1.
Goal- is one of:
  • a callable atom or term, or
  • the atom ignore, which causes monitoring for Term to be discontinued.
  987paxos_on_change(Term, Goal) :-
  988    paxos_key(Term, Key),
  989    paxos_on_change(Key, Term, Goal).
  990
  991paxos_on_change(Key, Value, Goal) :-
  992    Goal = _:Plain,
  993    must_be(callable, Plain),
  994    (   Plain == ignore
  995    ->  unlisten(paxos_user, paxos_changed(Key,Value))
  996    ;   listen(paxos_user, paxos_changed(Key,Value),
  997               key_changed(Key, Value, Goal)),
  998        paxos_initialize
  999    ).
 1000
 1001key_changed(_Key, _Value, Goal) :-
 1002    E = error(_,_),
 1003    catch(thread_create(Goal, _, [detached(true)]),
 1004          E, key_error(E)).
 1005
 1006key_error(error(permission_error(create, thread, _), _)) :-
 1007    !.
 1008key_error(E) :-
 1009    print_message(error, E).
 1010
 1011
 1012		 /*******************************
 1013		 *            HOOKS		*
 1014		 *******************************/
 node(-Node) is det
Get the node ID for this paxos node.
 quorum(-Quorum) is det
Get the current quorum as a bitmask
 paxos_message(+PaxOS, +TimeOut, -BroadcastMessage) is det
Transform a basic PaxOS message in a message for the broadcasting service. This predicate is hooked by paxos_message_hook/3 with the same signature.
Arguments:
TimeOut- is one of - or a time in seconds.
 1032paxos_message(Paxos:From, TMO, Message) :-
 1033    paxos_message_hook(paxos(Paxos):From, TMO, Message),
 1034    !.
 1035paxos_message(Paxos, TMO, Message) :-
 1036    paxos_message_hook(paxos(Paxos), TMO, Message),
 1037    !.
 1038paxos_message(Paxos, TMO, Message) :-
 1039    throw(error(mode_error(det, fail,
 1040                           paxos:paxos_message_hook(Paxos, TMO, Message)), _)).
 1041
 1042
 1043		 /*******************************
 1044		 *           STORAGE		*
 1045		 *******************************/
 1046
 1047:- dynamic
 1048    paxons_ledger/4.                    % Key, Gen, Value, Holders
 ledger_current(?Key, ?Gen, ?Value, ?Holders) is nondet
True when Key is a known key in my ledger.
 1054ledger_current(Key, Gen, Value, Holders) :-
 1055    paxons_ledger(Key, Gen, Value, Holders),
 1056    valid(Holders).
 ledger(+Key, -Gen, -Value) is semidet
True if the ledger has Value associated with Key at generation Gen. Note that if the value is not yet acknowledged by the leader we should not use it.
 1065ledger(Key, Gen, Value) :-
 1066    paxons_ledger(Key, Gen, Value0, Holders),
 1067    valid(Holders),
 1068    !,
 1069    Value = Value0.
 ledger_create(+Key, +Gen, +Value) is det
Create a new Key-Value pair at generation Gen. This is executed during the preparation phase.
 1076ledger_create(Key, Gen, Value) :-
 1077    get_time(Now),
 1078    asserta(paxons_ledger(Key, Gen, Value, created(Now))).
 ledger_update(+Key, +Gen, +Value) is semidet
Update Key to Value if the current generation is older than Gen. This reflects the accept phase of the protocol.
 1085ledger_update(Key, Gen, Value) :-
 1086    paxons_ledger(Key, Gen0, _Value, _Holders),
 1087    !,
 1088    Gen > Gen0,
 1089    get_time(Now),
 1090    asserta(paxons_ledger(Key, Gen, Value, accepted(Now))),
 1091    (   Gen0 == 0
 1092    ->  retractall(paxons_ledger(Key, Gen0, _, _))
 1093    ;   true
 1094    ).
 ledger_update_holders(+Key, +Gen, +Holders) is det
The leader acknowledged that Key@Gen represents a valid new
 1100ledger_update_holders(Key, Gen, Holders) :-
 1101    clause(paxons_ledger(Key, Gen, Value, Holders0), true, Ref),
 1102    !,
 1103    (   Holders0 == Holders
 1104    ->  true
 1105    ;   asserta(paxons_ledger(Key, Gen, Value, Holders)),
 1106        erase(Ref)
 1107    ),
 1108    clean_key(Holders0, Key, Gen).
 1109
 1110clean_key(Holders, _Key, _Gen) :-
 1111    valid(Holders),
 1112    !.
 1113clean_key(_, Key, Gen) :-
 1114    (   clause(paxons_ledger(Key, Gen0, _Value, _Holders0), true, Ref),
 1115        Gen0 < Gen,
 1116        erase(Ref),
 1117        fail
 1118    ;   true
 1119    ).
 ledger_learn(+Key, +Gen, +Value) is semidet
We received a learn event.
 1126ledger_learn(Key,Gen,Value) :-
 1127    paxons_ledger(Key, Gen0, Value0, _Holders),
 1128    !,
 1129    (   Gen == Gen0,
 1130        Value == Value0
 1131    ->  true
 1132    ;   Gen > Gen0
 1133    ->  get_time(Now),
 1134        asserta(paxons_ledger(Key, Gen, Value, learned(Now)))
 1135    ).
 1136ledger_learn(Key,Gen,Value) :-
 1137    get_time(Now),
 1138    asserta(paxons_ledger(Key, Gen, Value, learned(Now))).
 ledger_forget(+Nodes) is det
Remove Nodes from all ledgers. This is executed in a background thread.
 1145ledger_forget(Nodes) :-
 1146    catch(thread_create(ledger_forget_threaded(Nodes), _,
 1147                        [ detached(true)
 1148                        ]),
 1149          error(permission_error(create, thread, _), _),
 1150          true).
 1151
 1152ledger_forget_threaded(Nodes) :-
 1153    debug(paxos(node), 'Forgetting 0x~16r', [Nodes]),
 1154    forall(ledger_current(Key, Gen, _Value, Holders),
 1155           ledger_forget(Nodes, Key, Gen, Holders)),
 1156    debug(paxos(node), 'Forgotten 0x~16r', [Nodes]).
 1157
 1158ledger_forget(Nodes, Key, Gen, Holders) :-
 1159    NewHolders is Holders /\ \Nodes,
 1160    (   NewHolders \== Holders,
 1161        ledger_update_holders(Key, Gen, NewHolders)
 1162    ->  true
 1163    ;   true
 1164    ).
 1165
 1166valid(Holders) :-
 1167    integer(Holders).
 1168
 1169
 1170		 /*******************************
 1171		 *             UTIL		*
 1172		 *******************************/
 c_element(+NewList, +Old, -Value)
A Muller c-element is a logic block used in asynchronous logic. Its output assumes the value of its input iff all of its inputs are identical. Otherwise, the output retains its original value.
 1180c_element([New | More], _Old, New) :-
 1181    forall(member(N, More), N == New),
 1182    !.
 1183c_element(_List, Old, Old).
 arg_union(+Arg, +ListOfTerms, -Set) is det
Get all the nth args from ListOfTerms and do a set union on the result.
 1190arg_union(Arg, NodeStatusList, Set) :-
 1191    maplist(arg(Arg), NodeStatusList, Sets),
 1192    list_union(Sets, Set).
 1193
 1194list_union(Sets, Set) :-
 1195    list_union(Sets, 0, Set).
 1196
 1197list_union([], Set, Set).
 1198list_union([H|T], Set0, Set) :-
 1199    Set1 is Set0 \/ H,
 1200    list_union(T, Set1, Set)