1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald, Jan Wielemaker 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2009-2018, Jeffrey Rosenwald 7 CWI, Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(paxos, 37 [ paxos_get/1, % ?Term 38 paxos_get/2, % +Key, -Value 39 paxos_get/3, % +Key, -Value, +Options 40 paxos_set/1, % ?Term 41 paxos_set/2, % +Key, +Value 42 paxos_set/3, % +Key, +Value, +Options 43 paxos_on_change/2, % ?Term, +Goal 44 paxos_on_change/3, % ?Key, ?Value, +Goal 45 46 paxos_initialize/1, % +Options 47 % Hook support 48 paxos_replicate_key/3 % +Nodes, ?Key, +Options 49 ]). 50:- use_module(library(broadcast)). 51:- use_module(library(debug)). 52:- use_module(library(lists)). 53:- use_module(library(settings)). 54:- use_module(library(option)). 55:- use_module(library(error)). 56:- use_module(library(apply)). 57:- use_module(library(solution_sequences)).
129:- meta_predicate 130 paxos_on_change( , ), 131 paxos_on_change( , , ). 132 133:- multifile 134 paxos_message_hook/3. % +PaxOS, +TimeOut, -Message 135 136:- setting(max_sets, nonneg, 20, 137 "Max Retries to get to an agreement"). 138:- setting(max_gets, nonneg, 5, 139 "Max Retries to get a value from the forum"). 140:- setting(response_timeout, float, 0.020, 141 "Max time to wait for a response"). 142:- setting(replication_rate, number, 1000, 143 "Number of keys replicated per second"). 144:- setting(death_half_life, number, 10, 145 "Half-time for failure score"). 146:- setting(death_score, number, 100, 147 "Number of keys replicated per second").
paxos_initialize([])
is executed lazily as part of
the first paxos operation. Defined options:
NodeID must be a small non-negative integer as these identifiers are used in bitmaps.
169:- dynamic paxos_initialized/0. 170:- volatile paxos_initialized/0. 171 172paxos_initialize(_Options) :- 173 paxos_initialized, 174 !. 175paxos_initialize(Options) :- 176 with_mutex(paxos, paxos_initialize_sync(Options)). 177 178paxos_initialize_sync(_Options) :- 179 paxos_initialized, 180 !. 181paxos_initialize_sync(Options) :- 182 at_halt(paxos_leave), 183 listen(paxos, paxos(X), paxos_message(X)), 184 paxos_assign_node(Options), 185 start_replicator, 186 asserta(paxos_initialized). 187 188paxos_initialize :- 189 paxos_initialize([]). 190 191 192 /******************************* 193 * ADMIN * 194 *******************************/
202admin_key(quorum, '$paxos_quorum'). 203admin_key(dead, '$paxos_dead_nodes'). 204 205paxos_get_admin(Name, Value) :- 206 admin_key(Name, Key), 207 paxos_get(Key, Value). 208 209paxos_set_admin(Name, Value) :- 210 admin_key(Name, Key), 211 paxos_set(Key, Value). 212 213paxos_set_admin_bg(Name, Value) :- 214 thread_create(ignore(paxos_set_admin(Name, Value)), _, 215 [ detached(true) 216 ]). 217 218 219 /******************************* 220 * NODE DATA * 221 *******************************/
241:- dynamic 242 node/1, % NodeID 243 quorum/1, % Bitmap 244 failed/1, % Bitmap 245 failed/3, % NodeID, LastTried, Score 246 leaving/0, % Node is leaving 247 dead/1, % Bitmap 248 salt/1. % Unique key 249:- volatile 250 node/1, 251 quorum/1, 252 failed/1, 253 failed/3, 254 leaving/0, 255 dead/1, 256 salt/1.
264paxos_assign_node(Options) :- 265 ( option(node(Node), Options) 266 -> node(Node) 267 ; node(_) 268 ), % already done 269 !. 270paxos_assign_node(Options) :- 271 between(1, 20, Retry), 272 option(node(Node), Options, Node), 273 ( node(_) 274 -> permission_error(set, paxos_node, Node) 275 ; true 276 ), 277 retractall(dead(_)), 278 retractall(quorum(_)), 279 retractall(failed(_)), 280 retractall(failed(_,_,_)), 281 retractall(leaving), 282 Salt is random(1<<63), 283 asserta(salt(Salt)), 284 paxos_message(node(N,Q,D):From, 0.25, NodeQuery), 285 findall(t(N,Q,D,From), 286 broadcast_request(NodeQuery), 287 Network), 288 select(t(self,0,Salt,Me), Network, AllNodeStatus), 289 partition(starting, AllNodeStatus, Starting, Running), 290 nth_starting(Starting, Salt, Offset), 291 retractall(salt(_)), 292 debug(paxos(node), 'Me@~p; starting: ~p; running: ~p', 293 [Me, Starting, Running]), 294 arg_union(2, Running, Quorum), 295 arg_union(3, Running, Dead), 296 ( var(Node) 297 -> ( call_nth(( between(0, 1000, Node), 298 \+ memberchk(t(Node,_,_,_), Running), 299 Dead /\ (1<<Node) =:= 0), 300 Offset) 301 -> debug(paxos(node), 'Assigning myself node ~d', [Node]) 302 ; resource_error(paxos_nodes) 303 ) 304 ; memberchk(t(Node,_,_,_), Running) 305 -> permission_error(set, paxos_node, Node) 306 ; Rejoin = true 307 ), 308 asserta(node(Node)), 309 ( claim_node(Node, Me) 310 -> !, 311 asserta(dead(Dead)), 312 set_quorum(Node, Quorum), 313 ( Rejoin == true 314 -> paxos_rejoin 315 ; true 316 ) 317 ; debug(paxos(node), 'Node already claimed; retrying (~p)', [Node, Retry]), 318 retractall(node(Node)), 319 fail 320 ). 321 322starting(t(self,_Quorum,_Salt,_Address)). 323 324nth_starting(Starting, Salt, N) :- 325 maplist(arg(3), Starting, Salts), 326 sort([Salt|Salts], Sorted), 327 nth1(N, Sorted, Salt), 328 !. 329 330claim_node(Node, Me) :- 331 paxos_message(claim_node(Node, Ok):From, 0.25, NodeQuery), 332 forall(( broadcast_request(NodeQuery), 333 From \== Me, 334 debug(paxos(node), 'Claim ~p ~p: ~p', [Node, From, Ok]) 335 ), 336 Ok == true). 337 338set_quorum(Node, Quorum0) :- 339 Quorum is Quorum0 \/ (1<<Node), 340 debug(paxos(node), 'Adding ~d to quorum (now 0x~16r)', [Node, Quorum]), 341 asserta(quorum(Quorum)), 342 paxos_set_admin(quorum, Quorum).
352paxos_rejoin :-
353 node(Node),
354 repeat,
355 paxos_get_admin(dead, Dead0),
356 Dead is Dead0 /\ \(1<<Node),
357 ( Dead == Dead0
358 -> true
359 ; paxos_set_admin(dead, Dead)
360 ),
361 !.
371paxos_leave :- 372 node(Node), 373 !, 374 asserta(leaving), 375 paxos_leave(Node), 376 Set is 1<<Node, 377 paxos_message(forget(Set), -, Forget), 378 broadcast(Forget), 379 unlisten(paxos), 380 retractall(leaving). 381paxos_leave. 382 383paxos_leave(Node) :- 384 !, 385 paxos_update_set(quorum, del(Node)), 386 paxos_update_set(dead, add(Node)). 387paxos_leave(_). 388 389paxos_update_set(Set, How) :- 390 repeat, 391 Term =.. [Set,Value], 392 call(Term), 393 ( How = add(Node) 394 -> NewValue is Value \/ (1<<Node) 395 ; How = del(Node) 396 -> NewValue is Value /\ \(1<<Node) 397 ), 398 ( Value == NewValue 399 -> true 400 ; paxos_set_admin(Set, NewValue) 401 ), 402 !. 403 404 /******************************* 405 * NODE STATUS * 406 *******************************/
416update_failed(Action, Quorum, Alive) :- 417 Failed is Quorum /\ \Alive, 418 alive(Alive), 419 consider_dead(Failed), 420 ( failed(Failed) 421 -> true 422 ; ( clause(failed(_Old), true, Ref) 423 -> asserta(failed(Failed)), 424 erase(Ref), 425 debug(paxos(node), 'Updated failed quorum to 0x~16r', [Failed]) 426 ; asserta(failed(Failed)) 427 ), 428 ( Action == set 429 -> start_replicator 430 ; true 431 ) 432 ). 433 434consider_dead(0) :- 435 !. 436consider_dead(Failed) :- 437 Node is lsb(Failed), 438 consider_dead1(Node), 439 Rest is Failed /\ \(1<<Node), 440 consider_dead(Rest). 441 442consider_dead1(Node) :- 443 clause(failed(Node, Last, Score), true, Ref), 444 !, 445 setting(death_half_life, HalfLife), 446 setting(death_score, DeathScore), 447 get_time(Now), 448 Passed is Now-Last, 449 NewScore is Score*(2**(-Passed/HalfLife)) + 10, 450 asserta(failed(Node, Now, NewScore)), 451 erase(Ref), 452 ( NewScore < DeathScore 453 -> debug(paxos(node), 'Consider node ~d dead', [Node]), 454 paxos_leave(Node) 455 ; true 456 ). 457consider_dead1(Node) :- 458 get_time(Now), 459 asserta(failed(Node, Now, 10)). 460 461alive(Bitmap) :- 462 ( clause(failed(Node, _Last, _Score), true, Ref), 463 Bitmap /\ (1<<Node) =\= 0, 464 erase(Ref), 465 fail 466 ; true 467 ).
478life_quorum(Quorum, LifeQuorum) :- 479 quorum(Quorum), 480 ( failed(Failed), 481 Failed \== 0, 482 LifeQuorum is Quorum /\ \Failed, 483 majority(LifeQuorum, Quorum) 484 -> true 485 ; LifeQuorum = Quorum 486 ). 487 488 489 /******************************* 490 * NETWORK STATUS * 491 *******************************/ 492 493:- admin_key(quorum, Key), 494 listen(paxos_changed(Key, Quorum), 495 update_quorum(Quorum)). 496:- admin_key(dead, Key), 497 listen(paxos_changed(Key, Death), 498 update_dead(Death)). 499 500update_quorum(Proposed) :- 501 debug(paxos(node), 'Received quorum proposal 0x~16r', [Proposed]), 502 quorum(Proposed), 503 !. 504update_quorum(Proposed) :- 505 leaving, 506 !, 507 update(quorum(Proposed)). 508update_quorum(Proposed) :- 509 node(Node), 510 Proposed /\ (1<<Node) =\= 0, 511 !, 512 update(quorum(Proposed)). 513update_quorum(Proposed) :- 514 node(Node), 515 NewQuorum is Proposed \/ (1<<Node), 516 update(quorum(NewQuorum)), 517 debug(paxos(node), 'I''m not in the quorum! Proposing 0x~16r', [NewQuorum]), 518 paxos_set_admin_bg(quorum, NewQuorum). 519 520update_dead(Proposed) :- 521 debug(paxos(node), 'Received dead proposal 0x~16r', [Proposed]), 522 dead(Proposed), 523 !. 524update_dead(Proposed) :- 525 leaving, 526 !, 527 update(dead(Proposed)). 528update_dead(Proposed) :- 529 node(Node), 530 Proposed /\ (1<<Node) =:= 0, 531 !, 532 update(dead(Proposed)). 533update_dead(Proposed) :- 534 node(Node), 535 NewDead is Proposed /\ \(1<<Node), 536 update(dead(NewDead)), 537 paxos_set_admin_bg(dead, NewDead). 538 539update(Clause) :- 540 functor(Clause, Name, Arity), 541 functor(Generic, Name, Arity), 542 ( clause(Generic, true, Ref) 543 -> asserta(Clause), 544 erase(Ref) 545 ; asserta(Clause) 546 ). 547 548 /******************************* 549 * INBOUND EVENTS * 550 *******************************/
0
for Gen and the
our node id for Node.nack
.587paxos_message(prepare(Key,Node,Gen,Value)) :- 588 node(Node), 589 ( ledger(Key, Gen, _) 590 -> true 591 ; Gen = 0, 592 ledger_create(Key, Gen, Value) 593 ), 594 debug(paxos, 'Prepared ~p-~p@~d', [Key,Value,Gen]). 595paxos_message(accept(Key,Node,Gen,GenA,Value)) :- 596 node(Node), 597 ( ledger_update(Key, Gen, Value) 598 -> debug(paxos, 'Accepted ~p-~p@~d', [Key,Value,Gen]), 599 GenA = Gen 600 ; debug(paxos, 'Rejected ~p-~p@~d', [Key,Value,Gen]), 601 GenA = nack 602 ). 603paxos_message(changed(Key,Gen,Value,Acceptors)) :- 604 debug(paxos, 'Changed ~p-~p@~d for ~p', [Key, Value, Gen, Acceptors]), 605 ledger_update_holders(Key,Gen,Acceptors), 606 broadcast(paxos_changed(Key,Value)). 607paxos_message(learn(Key,Node,Gen,GenA,Value)) :- 608 node(Node), 609 debug(paxos, 'Learn ~p-~p@~p?', [Key, Value, Gen]), 610 ( ledger_learn(Key,Gen,Value) 611 -> debug(paxos, 'Learned ~p-~p@~d', [Key,Value,Gen]), 612 GenA = Gen 613 ; debug(paxos, 'Rejected ~p@~d', [Key, Gen]), 614 GenA = nack 615 ). 616paxos_message(learned(Key,Gen,_Value,Acceptors)) :- 617 ledger_update_holders(Key,Gen,Acceptors). 618paxos_message(retrieve(Key,Node,K,Value)) :- 619 node(Node), 620 debug(paxos, 'Retrieving ~p', [Key]), 621 ledger(Key,K,Value), 622 debug(paxos, 'Retrieved ~p-~p@~d', [Key,Value,K]), 623 !. 624paxos_message(forget(Nodes)) :- 625 ledger_forget(Nodes). 626paxos_message(node(Node,Quorum,Dead)) :- 627 ( node(Node), 628 quorum(Quorum), 629 dead(Dead) 630 -> true 631 ; salt(Salt), 632 Node = self, 633 Quorum = 0, 634 Dead = Salt 635 ). 636paxos_message(claim_node(Node, Ok)) :- 637 ( node(Node) 638 -> Ok = false 639 ; Ok = true 640 ). 641 642 643 /******************************* 644 * KEY-VALUE OPERATIONS * 645 *******************************/
paxos_key(Term,Key)
, pasox_set(Key,Term)
. I.e., Term
is a ground compound term and its key is the name/arity pair. This
version provides compatibility with older versions of this library.
On success, paxos_set/1 will also broadcast the term
paxos(changed(Key,Value)
, to the quorum.
Options processed:
max_sets
(20).response_timeout
(0.020, 20ms).683paxos_set(Term) :- 684 paxos_key(Term, Key), 685 paxos_set(Key, Term, []). 686 687paxos_set(Key, Value) :- 688 paxos_set(Key, Value, []). 689 690paxos_set(Key, Value, Options) :- 691 must_be(ground, Key-Value), 692 paxos_initialize, 693 option(retry(Retries), Options, Retries), 694 option(timeout(TMO), Options, TMO), 695 apply_default(Retries, max_sets), 696 apply_default(TMO, response_timeout), 697 paxos_message(prepare(Key,Np,Rp,Value), TMO, Prepare), 698 between(0, Retries, _), 699 life_quorum(Quorum, Alive), 700 debug(paxos, 'Set: ~p -> ~p', [Key, Value]), 701 collect(Quorum, false, Np, Rp, Prepare, Rps, PrepNodes), 702 debug(paxos, 'Set: quorum: 0x~16r, prepared by 0x~16r, gens ~p', 703 [Quorum, PrepNodes, Rps]), 704 majority(PrepNodes, Quorum), 705 max_list(Rps, K), 706 succ(K, K1), 707 paxos_message(accept(Key,Na,K1,Ra,Value), TMO, Accept), 708 collect(Alive, Ra == nack, Na, Ra, Accept, Ras, AcceptNodes), 709 majority(AcceptNodes, Quorum), 710 intersecting(PrepNodes, AcceptNodes), 711 c_element(Ras, K, K1), 712 broadcast(paxos(log(Key,Value,AcceptNodes,K1))), 713 paxos_message(changed(Key,K1,Value,AcceptNodes), -, Changed), 714 broadcast(Changed), 715 update_failed(set, Quorum, AcceptNodes), 716 !. 717 718apply_default(Var, Setting) :- 719 var(Var), 720 !, 721 setting(Setting, Var). 722apply_default(_, _). 723 724majority(SubSet, Set) :- 725 popcount(SubSet) >= (popcount(Set)+2)//2. 726 727intersecting(Set1, Set2) :- 728 Set1 /\ Set2 =\= 0.
length(Result)
. The transfer stops if all members of
the set Quorum responded or the configured timeout passed.
743collect(Quorum, Stop, Node, Template, Message, Result, NodeSet) :-
744 State = state(0),
745 L0 = [dummy|_],
746 Answers = list(L0),
747 ( broadcast_request(Message),
748 (
749 -> !,
750 fail
751 ; true
752 ),
753 duplicate_term(Template, Copy),
754 NewLastCell = [Copy|_],
755 arg(1, Answers, LastCell),
756 nb_linkarg(2, LastCell, NewLastCell),
757 nb_linkarg(1, Answers, NewLastCell),
758 arg(1, State, Replied0),
759 Replied is Replied0 \/ (1<<Node),
760 nb_setarg(1, State, Replied),
761 Quorum /\ Replied =:= Quorum
762 -> true
763 ; true
764 ),
765 arg(1, State, NodeSet),
766 arg(1, Answers, [_]), % close the answer list
767 L0 = [_|Result].
paxos_key(Term,Key)
, pasox_get(Key,Term)
. I.e., Term
is a compound term and its key is the name/arity pair. This version
provides compatibility with older versions of this library.Options processed:
max_gets
(5).response_timeout
(0.020, 20ms).798paxos_get(Term) :- 799 paxos_key(Term, Key), 800 paxos_get(Key, Term, []). 801paxos_get(Key, Value) :- 802 paxos_get(Key, Value, []). 803 804paxos_get(Key, Value, _) :- 805 ledger(Key, _Line, Value), 806 !. 807paxos_get(Key, Value, Options) :- 808 paxos_initialize, 809 option(retry(Retries), Options, Retries), 810 option(timeout(TMO), Options, TMO), 811 apply_default(Retries, max_gets), 812 apply_default(TMO, response_timeout), 813 Msg = Line-Value, 814 paxos_message(retrieve(Key,Nr,Line,Value), TMO, Retrieve), 815 node(Node), 816 between(0, Retries, _), 817 life_quorum(Quorum, Alive), 818 QuorumA is Alive /\ \(1<<Node), 819 collect(QuorumA, false, Nr, Msg, Retrieve, Terms, RetrievedNodes), 820 debug(paxos, 'Retrieved: ~p from 0x~16r', [Terms, RetrievedNodes]), 821 highest_vote(Terms, _Line-MajorityValue, Count), 822 debug(paxos, 'Best: ~p with ~d votes', [MajorityValue, Count]), 823 Count >= (popcount(QuorumA)+2)//2, 824 debug(paxos, 'Retrieve: accept ~p', [MajorityValue]), 825 update_failed(get, Quorum, RetrievedNodes), 826 paxos_set(Key, MajorityValue), % Is this needed? 827 !. 828 829highest_vote(Terms, Term, Count) :- 830 msort(Terms, Sorted), 831 count_votes(Sorted, Counted), 832 sort(1, >, Counted, [Count-Term|_]). 833 834count_votes([], []). 835count_votes([H|T0], [N-H|T]) :- 836 count_same(H, T0, 1, N, R), 837 count_votes(R, T). 838 839count_same(H, [Hc|T0], C0, C, R) :- 840 H == Hc, 841 !, 842 C1 is C0+1, 843 count_same(H, T0, C1, C, R). 844count_same(_, R, C, C, R).
'$c'(Name,Arity)
. Note that we do not
use Name/Arity
and X/Y
is naturally used to organize keys as
hierachical paths.853paxos_key(Compound, '$c'(Name,Arity)) :- 854 compound(Compound), !, 855 compound_name_arity(Compound, Name, Arity). 856paxos_key(Compound, _) :- 857 must_be(compound, Compound). 858 859 860 /******************************* 861 * REPLICATION * 862 *******************************/
873start_replicator :- 874 catch(thread_send_message(paxos_replicator, run), 875 error(existence_error(_,_),_), 876 fail), 877 !. 878start_replicator :- 879 catch(thread_create(replicator, _, 880 [ alias(paxos_replicator), 881 detached(true) 882 ]), 883 error(permission_error(_,_,_),_), 884 true). 885 886replicator :- 887 setting(replication_rate, ReplRate), 888 ReplSleep is 1/ReplRate, 889 node(Node), 890 debug(paxos(replicate), 'Starting replicator', []), 891 State = state(idle), 892 repeat, 893 quorum(Quorum), 894 dead(Dead), 895 LifeQuorum is Quorum /\ \Dead, 896 ( LifeQuorum /\ \(1<<Node) =:= 0 897 -> debug(paxos(replicate), 898 'Me: ~d, Quorum: 0x~16r, Dead: 0x~16r: I''m alone, waiting ...', 899 [Node, Quorum, Dead]), 900 thread_get_message(_) 901 ; ( paxos_replicate_key(LifeQuorum, Key, []) 902 -> replicated(State, key(Key)), 903 thread_self(Me), 904 thread_get_message(Me, _, [timeout(ReplSleep)]) 905 ; replicated(State, idle), 906 thread_get_message(_) 907 ) 908 ), 909 fail. 910 911replicated(State, key(_Key)) :- 912 arg(1, State, idle), 913 !, 914 debug(paxos(replicate), 'Start replicating ...', []), 915 nb_setarg(1, State, 1). 916replicated(State, key(_Key)) :- 917 !, 918 arg(1, State, C0), 919 C is C0+1, 920 nb_setarg(1, State, C). 921replicated(State, idle) :- 922 arg(1, State, idle), 923 !. 924replicated(State, idle) :- 925 arg(1, State, Count), 926 debug(paxos(replicate), 'Replicated ~D keys', [Count]), 927 nb_setarg(1, State, idle).
response_timeout
(0.020, 20ms).939paxos_replicate_key(Nodes, Key, Options) :- 940 replication_key(Nodes, Key), 941 option(timeout(TMO), Options, TMO), 942 apply_default(TMO, response_timeout), 943 ledger_current(Key, Gen, Value, Holders), 944 paxos_message(learn(Key,Na,Gen,Ga,Value), TMO, Learn), 945 collect(Nodes, Ga == nack, Na, Ga, Learn, _Gas, LearnedNodes), 946 NewHolders is Holders \/ LearnedNodes, 947 paxos_message(learned(Key,Gen,Value,NewHolders), -, Learned), 948 broadcast(Learned), 949 update_failed(replicate, Nodes, LearnedNodes). 950 951replication_key(_Nodes, Key) :- 952 ground(Key), 953 !. 954replication_key(Nodes, Key) :- 955 ( Nth is 1+random(popcount(Nodes)) 956 ; Nth = 1 957 ), 958 call_nth(needs_replicate(Nodes, Key), Nth), 959 !. 960 961needs_replicate(Nodes, Key) :- 962 ledger_current(Key, _Gen, _Value, Holders), 963 Nodes /\ \Holders =\= 0, 964 \+ admin_key(_, Key). 965 966 967 /******************************* 968 * KEY CHANGE EVENTS * 969 *******************************/
changed(Key,Value)
notifications for Key, which
are emitted as the result of successful paxos_set/3 transactions.
When one is received for Key, then Goal is executed in a separate
thread of execution.
987paxos_on_change(Term, Goal) :- 988 paxos_key(Term, Key), 989 paxos_on_change(Key, Term, Goal). 990 991paxos_on_change(Key, Value, Goal) :- 992 Goal = _:Plain, 993 must_be(callable, Plain), 994 ( Plain == ignore 995 -> unlisten(paxos_user, paxos_changed(Key,Value)) 996 ; listen(paxos_user, paxos_changed(Key,Value), 997 key_changed(Key, Value, Goal)), 998 paxos_initialize 999 ). 1000 1001key_changed(_Key, _Value, Goal) :- 1002 E = error(_,_), 1003 catch(thread_create(Goal, _, [detached(true)]), 1004 E, key_error(E)). 1005 1006key_error(error(permission_error(create, thread, _), _)) :- 1007 !. 1008key_error(E) :- 1009 print_message(error, E). 1010 1011 1012 /******************************* 1013 * HOOKS * 1014 *******************************/
1032paxos_message(Paxos:From, TMO, Message) :- 1033 paxos_message_hook(paxos(Paxos):From, TMO, Message), 1034 !. 1035paxos_message(Paxos, TMO, Message) :- 1036 paxos_message_hook(paxos(Paxos), TMO, Message), 1037 !. 1038paxos_message(Paxos, TMO, Message) :- 1039 throw(error(mode_error(det, fail, 1040 paxos:paxos_message_hook(Paxos, TMO, Message)), _)). 1041 1042 1043 /******************************* 1044 * STORAGE * 1045 *******************************/ 1046 1047:- dynamic 1048 paxons_ledger/4. % Key, Gen, Value, Holders
1054ledger_current(Key, Gen, Value, Holders) :-
1055 paxons_ledger(Key, Gen, Value, Holders),
1056 valid(Holders).
1065ledger(Key, Gen, Value) :-
1066 paxons_ledger(Key, Gen, Value0, Holders),
1067 valid(Holders),
1068 !,
1069 Value = Value0.
1076ledger_create(Key, Gen, Value) :-
1077 get_time(Now),
1078 asserta(paxons_ledger(Key, Gen, Value, created(Now))).
1085ledger_update(Key, Gen, Value) :-
1086 paxons_ledger(Key, Gen0, _Value, _Holders),
1087 !,
1088 Gen > Gen0,
1089 get_time(Now),
1090 asserta(paxons_ledger(Key, Gen, Value, accepted(Now))),
1091 ( Gen0 == 0
1092 -> retractall(paxons_ledger(Key, Gen0, _, _))
1093 ; true
1094 ).
1100ledger_update_holders(Key, Gen, Holders) :- 1101 clause(paxons_ledger(Key, Gen, Value, Holders0), true, Ref), 1102 !, 1103 ( Holders0 == Holders 1104 -> true 1105 ; asserta(paxons_ledger(Key, Gen, Value, Holders)), 1106 erase(Ref) 1107 ), 1108 clean_key(Holders0, Key, Gen). 1109 1110clean_key(Holders, _Key, _Gen) :- 1111 valid(Holders), 1112 !. 1113clean_key(_, Key, Gen) :- 1114 ( clause(paxons_ledger(Key, Gen0, _Value, _Holders0), true, Ref), 1115 Gen0 < Gen, 1116 erase(Ref), 1117 fail 1118 ; true 1119 ).
1126ledger_learn(Key,Gen,Value) :- 1127 paxons_ledger(Key, Gen0, Value0, _Holders), 1128 !, 1129 ( Gen == Gen0, 1130 Value == Value0 1131 -> true 1132 ; Gen > Gen0 1133 -> get_time(Now), 1134 asserta(paxons_ledger(Key, Gen, Value, learned(Now))) 1135 ). 1136ledger_learn(Key,Gen,Value) :- 1137 get_time(Now), 1138 asserta(paxons_ledger(Key, Gen, Value, learned(Now))).
1145ledger_forget(Nodes) :- 1146 catch(thread_create(ledger_forget_threaded(Nodes), _, 1147 [ detached(true) 1148 ]), 1149 error(permission_error(create, thread, _), _), 1150 true). 1151 1152ledger_forget_threaded(Nodes) :- 1153 debug(paxos(node), 'Forgetting 0x~16r', [Nodes]), 1154 forall(ledger_current(Key, Gen, _Value, Holders), 1155 ledger_forget(Nodes, Key, Gen, Holders)), 1156 debug(paxos(node), 'Forgotten 0x~16r', [Nodes]). 1157 1158ledger_forget(Nodes, Key, Gen, Holders) :- 1159 NewHolders is Holders /\ \Nodes, 1160 ( NewHolders \== Holders, 1161 ledger_update_holders(Key, Gen, NewHolders) 1162 -> true 1163 ; true 1164 ). 1165 1166valid(Holders) :- 1167 integer(Holders). 1168 1169 1170 /******************************* 1171 * UTIL * 1172 *******************************/
1180c_element([New | More], _Old, New) :- 1181 forall(member(N, More), N == New), 1182 !. 1183c_element(_List, Old, Old).
1190arg_union(Arg, NodeStatusList, Set) :- 1191 maplist(arg(Arg), NodeStatusList, Sets), 1192 list_union(Sets, Set). 1193 1194list_union(Sets, Set) :- 1195 list_union(Sets, 0, Set). 1196 1197list_union([], Set, Set). 1198list_union([H|T], Set0, Set) :- 1199 Set1 is Set0 \/ H, 1200 list_union(T, Set1, Set)
A Replicated Data Store
This module provides a replicated data store that is coordinated using a variation on Lamport's Paxos concensus protocol. The original method is described in his paper entitled, "The Part-time Parliament", which was published in 1998. The algorithm is tolerant of non-Byzantine failure. That is late or lost delivery or reply, but not senseless delivery or reply. The present algorithm takes advantage of the convenience offered by multicast to the quorum's membership, who can remain anonymous and who can come and go as they please without effecting Liveness or Safety properties.
Paxos' quorum is a set of one or more attentive members, whose processes respond to queries within some known time limit (< 20ms), which includes roundtrip delivery delay. This property is easy to satisfy given that every coordinator is necessarily a member of the quorum as well, and a quorum of one is permitted. An inattentive member (e.g. one whose actions are late or lost) is deemed to be "not-present" for the purposes of the present transaction and consistency cannot be assured for that member. As long as there is at least one attentive member of the quorum, then persistence of the database is assured.
Each member maintains a ledger of terms along with information about when they were originally recorded. The member's ledger is deterministic. That is to say that there can only be one entry per functor/arity combination. No member will accept a new term proposal that has a line number that is equal-to or lower-than the one that is already recorded in the ledger.
Paxos is a three-phase protocol:
For practical reasons, we rely on the partially synchronous behavior (e.g. limited upper time bound for replies) of broadcast_request/1 over TIPC to ensure Progress. Perhaps more importantly, we rely on the fact that the TIPC broadcast listener state machine guarantees the atomicity of broadcast_request/1 at the process level, thus obviating the need for external mutual exclusion mechanisms.
Note that this algorithm does not guarantee the rightness of the value proposed. It only guarantees that if successful, the value proposed is identical for all attentive members of the quorum.
tipc_broadcast.pl
,udp_broadcast.pl
*/