35
36:- module(paxos,
37 [ paxos_get/1, 38 paxos_get/2, 39 paxos_get/3, 40 paxos_set/1, 41 paxos_set/2, 42 paxos_set/3, 43 paxos_on_change/2, 44 paxos_on_change/3, 45
46 paxos_initialize/1, 47 48 paxos_replicate_key/3 49 ]). 50:- use_module(library(broadcast)). 51:- use_module(library(debug)). 52:- use_module(library(lists)). 53:- use_module(library(settings)). 54:- use_module(library(option)). 55:- use_module(library(error)). 56:- use_module(library(apply)). 57:- use_module(library(solution_sequences)). 58
128
129:- meta_predicate
130 paxos_on_change(?, 0),
131 paxos_on_change(?, ?, 0). 132
133:- multifile
134 paxos_message_hook/3. 135
136:- setting(max_sets, nonneg, 20,
137 "Max Retries to get to an agreement"). 138:- setting(max_gets, nonneg, 5,
139 "Max Retries to get a value from the forum"). 140:- setting(response_timeout, float, 0.020,
141 "Max time to wait for a response"). 142:- setting(replication_rate, number, 1000,
143 "Number of keys replicated per second"). 144:- setting(death_half_life, number, 10,
145 "Half-time for failure score"). 146:- setting(death_score, number, 100,
147 "Number of keys replicated per second"). 148
149
168
169:- dynamic paxos_initialized/0. 170:- volatile paxos_initialized/0. 171
172paxos_initialize(_Options) :-
173 paxos_initialized,
174 !.
175paxos_initialize(Options) :-
176 with_mutex(paxos, paxos_initialize_sync(Options)).
177
178paxos_initialize_sync(_Options) :-
179 paxos_initialized,
180 !.
181paxos_initialize_sync(Options) :-
182 at_halt(paxos_leave),
183 listen(paxos, paxos(X), paxos_message(X)),
184 paxos_assign_node(Options),
185 start_replicator,
186 asserta(paxos_initialized).
187
188paxos_initialize :-
189 paxos_initialize([]).
190
191
192 195
201
202admin_key(quorum, '$paxos_quorum').
203admin_key(dead, '$paxos_dead_nodes').
204
205paxos_get_admin(Name, Value) :-
206 admin_key(Name, Key),
207 paxos_get(Key, Value).
208
209paxos_set_admin(Name, Value) :-
210 admin_key(Name, Key),
211 paxos_set(Key, Value).
212
213paxos_set_admin_bg(Name, Value) :-
214 thread_create(ignore(paxos_set_admin(Name, Value)), _,
215 [ detached(true)
216 ]).
217
218
219 222
240
241:- dynamic
242 node/1, 243 quorum/1, 244 failed/1, 245 failed/3, 246 leaving/0, 247 dead/1, 248 salt/1. 249:- volatile
250 node/1,
251 quorum/1,
252 failed/1,
253 failed/3,
254 leaving/0,
255 dead/1,
256 salt/1. 257
263
264paxos_assign_node(Options) :-
265 ( option(node(Node), Options)
266 -> node(Node)
267 ; node(_)
268 ), 269 !.
270paxos_assign_node(Options) :-
271 between(1, 20, Retry),
272 option(node(Node), Options, Node),
273 ( node(_)
274 -> permission_error(set, paxos_node, Node)
275 ; true
276 ),
277 retractall(dead(_)),
278 retractall(quorum(_)),
279 retractall(failed(_)),
280 retractall(failed(_,_,_)),
281 retractall(leaving),
282 Salt is random(1<<63),
283 asserta(salt(Salt)),
284 paxos_message(node(N,Q,D):From, 0.25, NodeQuery),
285 findall(t(N,Q,D,From),
286 broadcast_request(NodeQuery),
287 Network),
288 select(t(self,0,Salt,Me), Network, AllNodeStatus),
289 partition(starting, AllNodeStatus, Starting, Running),
290 nth_starting(Starting, Salt, Offset),
291 retractall(salt(_)),
292 debug(paxos(node), 'Me@~p; starting: ~p; running: ~p',
293 [Me, Starting, Running]),
294 arg_union(2, Running, Quorum),
295 arg_union(3, Running, Dead),
296 ( var(Node)
297 -> ( call_nth(( between(0, 1000, Node),
298 \+ memberchk(t(Node,_,_,_), Running),
299 Dead /\ (1<<Node) =:= 0),
300 Offset)
301 -> debug(paxos(node), 'Assigning myself node ~d', [Node])
302 ; resource_error(paxos_nodes)
303 )
304 ; memberchk(t(Node,_,_,_), Running)
305 -> permission_error(set, paxos_node, Node)
306 ; Rejoin = true
307 ),
308 asserta(node(Node)),
309 ( claim_node(Node, Me)
310 -> !,
311 asserta(dead(Dead)),
312 set_quorum(Node, Quorum),
313 ( Rejoin == true
314 -> paxos_rejoin
315 ; true
316 )
317 ; debug(paxos(node), 'Node already claimed; retrying (~p)', [Node, Retry]),
318 retractall(node(Node)),
319 fail
320 ).
321
322starting(t(self,_Quorum,_Salt,_Address)).
323
324nth_starting(Starting, Salt, N) :-
325 maplist(arg(3), Starting, Salts),
326 sort([Salt|Salts], Sorted),
327 nth1(N, Sorted, Salt),
328 !.
329
330claim_node(Node, Me) :-
331 paxos_message(claim_node(Node, Ok):From, 0.25, NodeQuery),
332 forall(( broadcast_request(NodeQuery),
333 From \== Me,
334 debug(paxos(node), 'Claim ~p ~p: ~p', [Node, From, Ok])
335 ),
336 Ok == true).
337
338set_quorum(Node, Quorum0) :-
339 Quorum is Quorum0 \/ (1<<Node),
340 debug(paxos(node), 'Adding ~d to quorum (now 0x~16r)', [Node, Quorum]),
341 asserta(quorum(Quorum)),
342 paxos_set_admin(quorum, Quorum).
343
344
351
352paxos_rejoin :-
353 node(Node),
354 repeat,
355 paxos_get_admin(dead, Dead0),
356 Dead is Dead0 /\ \(1<<Node),
357 ( Dead == Dead0
358 -> true
359 ; paxos_set_admin(dead, Dead)
360 ),
361 !.
362
370
371paxos_leave :-
372 node(Node),
373 !,
374 asserta(leaving),
375 paxos_leave(Node),
376 Set is 1<<Node,
377 paxos_message(forget(Set), -, Forget),
378 broadcast(Forget),
379 unlisten(paxos),
380 retractall(leaving).
381paxos_leave.
382
383paxos_leave(Node) :-
384 !,
385 paxos_update_set(quorum, del(Node)),
386 paxos_update_set(dead, add(Node)).
387paxos_leave(_).
388
389paxos_update_set(Set, How) :-
390 repeat,
391 Term =.. [Set,Value],
392 call(Term),
393 ( How = add(Node)
394 -> NewValue is Value \/ (1<<Node)
395 ; How = del(Node)
396 -> NewValue is Value /\ \(1<<Node)
397 ),
398 ( Value == NewValue
399 -> true
400 ; paxos_set_admin(Set, NewValue)
401 ),
402 !.
403
404 407
415
416update_failed(Action, Quorum, Alive) :-
417 Failed is Quorum /\ \Alive,
418 alive(Alive),
419 consider_dead(Failed),
420 ( failed(Failed)
421 -> true
422 ; ( clause(failed(_Old), true, Ref)
423 -> asserta(failed(Failed)),
424 erase(Ref),
425 debug(paxos(node), 'Updated failed quorum to 0x~16r', [Failed])
426 ; asserta(failed(Failed))
427 ),
428 ( Action == set
429 -> start_replicator
430 ; true
431 )
432 ).
433
434consider_dead(0) :-
435 !.
436consider_dead(Failed) :-
437 Node is lsb(Failed),
438 consider_dead1(Node),
439 Rest is Failed /\ \(1<<Node),
440 consider_dead(Rest).
441
442consider_dead1(Node) :-
443 clause(failed(Node, Last, Score), true, Ref),
444 !,
445 setting(death_half_life, HalfLife),
446 setting(death_score, DeathScore),
447 get_time(Now),
448 Passed is Now-Last,
449 NewScore is Score*(2**(-Passed/HalfLife)) + 10,
450 asserta(failed(Node, Now, NewScore)),
451 erase(Ref),
452 ( NewScore < DeathScore
453 -> debug(paxos(node), 'Consider node ~d dead', [Node]),
454 paxos_leave(Node)
455 ; true
456 ).
457consider_dead1(Node) :-
458 get_time(Now),
459 asserta(failed(Node, Now, 10)).
460
461alive(Bitmap) :-
462 ( clause(failed(Node, _Last, _Score), true, Ref),
463 Bitmap /\ (1<<Node) =\= 0,
464 erase(Ref),
465 fail
466 ; true
467 ).
468
469
477
478life_quorum(Quorum, LifeQuorum) :-
479 quorum(Quorum),
480 ( failed(Failed),
481 Failed \== 0,
482 LifeQuorum is Quorum /\ \Failed,
483 majority(LifeQuorum, Quorum)
484 -> true
485 ; LifeQuorum = Quorum
486 ).
487
488
489 492
493:- admin_key(quorum, Key),
494 listen(paxos_changed(Key, Quorum),
495 update_quorum(Quorum)). 496:- admin_key(dead, Key),
497 listen(paxos_changed(Key, Death),
498 update_dead(Death)). 499
500update_quorum(Proposed) :-
501 debug(paxos(node), 'Received quorum proposal 0x~16r', [Proposed]),
502 quorum(Proposed),
503 !.
504update_quorum(Proposed) :-
505 leaving,
506 !,
507 update(quorum(Proposed)).
508update_quorum(Proposed) :-
509 node(Node),
510 Proposed /\ (1<<Node) =\= 0,
511 !,
512 update(quorum(Proposed)).
513update_quorum(Proposed) :-
514 node(Node),
515 NewQuorum is Proposed \/ (1<<Node),
516 update(quorum(NewQuorum)),
517 debug(paxos(node), 'I''m not in the quorum! Proposing 0x~16r', [NewQuorum]),
518 paxos_set_admin_bg(quorum, NewQuorum).
519
520update_dead(Proposed) :-
521 debug(paxos(node), 'Received dead proposal 0x~16r', [Proposed]),
522 dead(Proposed),
523 !.
524update_dead(Proposed) :-
525 leaving,
526 !,
527 update(dead(Proposed)).
528update_dead(Proposed) :-
529 node(Node),
530 Proposed /\ (1<<Node) =:= 0,
531 !,
532 update(dead(Proposed)).
533update_dead(Proposed) :-
534 node(Node),
535 NewDead is Proposed /\ \(1<<Node),
536 update(dead(NewDead)),
537 paxos_set_admin_bg(dead, NewDead).
538
539update(Clause) :-
540 functor(Clause, Name, Arity),
541 functor(Generic, Name, Arity),
542 ( clause(Generic, true, Ref)
543 -> asserta(Clause),
544 erase(Ref)
545 ; asserta(Clause)
546 ).
547
548 551
586
587paxos_message(prepare(Key,Node,Gen,Value)) :-
588 node(Node),
589 ( ledger(Key, Gen, _)
590 -> true
591 ; Gen = 0,
592 ledger_create(Key, Gen, Value)
593 ),
594 debug(paxos, 'Prepared ~p-~p@~d', [Key,Value,Gen]).
595paxos_message(accept(Key,Node,Gen,GenA,Value)) :-
596 node(Node),
597 ( ledger_update(Key, Gen, Value)
598 -> debug(paxos, 'Accepted ~p-~p@~d', [Key,Value,Gen]),
599 GenA = Gen
600 ; debug(paxos, 'Rejected ~p-~p@~d', [Key,Value,Gen]),
601 GenA = nack
602 ).
603paxos_message(changed(Key,Gen,Value,Acceptors)) :-
604 debug(paxos, 'Changed ~p-~p@~d for ~p', [Key, Value, Gen, Acceptors]),
605 ledger_update_holders(Key,Gen,Acceptors),
606 broadcast(paxos_changed(Key,Value)).
607paxos_message(learn(Key,Node,Gen,GenA,Value)) :-
608 node(Node),
609 debug(paxos, 'Learn ~p-~p@~p?', [Key, Value, Gen]),
610 ( ledger_learn(Key,Gen,Value)
611 -> debug(paxos, 'Learned ~p-~p@~d', [Key,Value,Gen]),
612 GenA = Gen
613 ; debug(paxos, 'Rejected ~p@~d', [Key, Gen]),
614 GenA = nack
615 ).
616paxos_message(learned(Key,Gen,_Value,Acceptors)) :-
617 ledger_update_holders(Key,Gen,Acceptors).
618paxos_message(retrieve(Key,Node,K,Value)) :-
619 node(Node),
620 debug(paxos, 'Retrieving ~p', [Key]),
621 ledger(Key,K,Value),
622 debug(paxos, 'Retrieved ~p-~p@~d', [Key,Value,K]),
623 !.
624paxos_message(forget(Nodes)) :-
625 ledger_forget(Nodes).
626paxos_message(node(Node,Quorum,Dead)) :-
627 ( node(Node),
628 quorum(Quorum),
629 dead(Dead)
630 -> true
631 ; salt(Salt),
632 Node = self,
633 Quorum = 0,
634 Dead = Salt
635 ).
636paxos_message(claim_node(Node, Ok)) :-
637 ( node(Node)
638 -> Ok = false
639 ; Ok = true
640 ).
641
642
643 646
652
682
683paxos_set(Term) :-
684 paxos_key(Term, Key),
685 paxos_set(Key, Term, []).
686
687paxos_set(Key, Value) :-
688 paxos_set(Key, Value, []).
689
690paxos_set(Key, Value, Options) :-
691 must_be(ground, Key-Value),
692 paxos_initialize,
693 option(retry(Retries), Options, Retries),
694 option(timeout(TMO), Options, TMO),
695 apply_default(Retries, max_sets),
696 apply_default(TMO, response_timeout),
697 paxos_message(prepare(Key,Np,Rp,Value), TMO, Prepare),
698 between(0, Retries, _),
699 life_quorum(Quorum, Alive),
700 debug(paxos, 'Set: ~p -> ~p', [Key, Value]),
701 collect(Quorum, false, Np, Rp, Prepare, Rps, PrepNodes),
702 debug(paxos, 'Set: quorum: 0x~16r, prepared by 0x~16r, gens ~p',
703 [Quorum, PrepNodes, Rps]),
704 majority(PrepNodes, Quorum),
705 max_list(Rps, K),
706 succ(K, K1),
707 paxos_message(accept(Key,Na,K1,Ra,Value), TMO, Accept),
708 collect(Alive, Ra == nack, Na, Ra, Accept, Ras, AcceptNodes),
709 majority(AcceptNodes, Quorum),
710 intersecting(PrepNodes, AcceptNodes),
711 c_element(Ras, K, K1),
712 broadcast(paxos(log(Key,Value,AcceptNodes,K1))),
713 paxos_message(changed(Key,K1,Value,AcceptNodes), -, Changed),
714 broadcast(Changed),
715 update_failed(set, Quorum, AcceptNodes),
716 !.
717
718apply_default(Var, Setting) :-
719 var(Var),
720 !,
721 setting(Setting, Var).
722apply_default(_, _).
723
724majority(SubSet, Set) :-
725 popcount(SubSet) >= (popcount(Set)+2)//2.
726
727intersecting(Set1, Set2) :-
728 Set1 /\ Set2 =\= 0.
729
730
742
743collect(Quorum, Stop, Node, Template, Message, Result, NodeSet) :-
744 State = state(0),
745 L0 = [dummy|_],
746 Answers = list(L0),
747 ( broadcast_request(Message),
748 ( Stop
749 -> !,
750 fail
751 ; true
752 ),
753 duplicate_term(Template, Copy),
754 NewLastCell = [Copy|_],
755 arg(1, Answers, LastCell),
756 nb_linkarg(2, LastCell, NewLastCell),
757 nb_linkarg(1, Answers, NewLastCell),
758 arg(1, State, Replied0),
759 Replied is Replied0 \/ (1<<Node),
760 nb_setarg(1, State, Replied),
761 Quorum /\ Replied =:= Quorum
762 -> true
763 ; true
764 ),
765 arg(1, State, NodeSet),
766 arg(1, Answers, [_]), 767 L0 = [_|Result].
768
774
797
798paxos_get(Term) :-
799 paxos_key(Term, Key),
800 paxos_get(Key, Term, []).
801paxos_get(Key, Value) :-
802 paxos_get(Key, Value, []).
803
804paxos_get(Key, Value, _) :-
805 ledger(Key, _Line, Value),
806 !.
807paxos_get(Key, Value, Options) :-
808 paxos_initialize,
809 option(retry(Retries), Options, Retries),
810 option(timeout(TMO), Options, TMO),
811 apply_default(Retries, max_gets),
812 apply_default(TMO, response_timeout),
813 Msg = Line-Value,
814 paxos_message(retrieve(Key,Nr,Line,Value), TMO, Retrieve),
815 node(Node),
816 between(0, Retries, _),
817 life_quorum(Quorum, Alive),
818 QuorumA is Alive /\ \(1<<Node),
819 collect(QuorumA, false, Nr, Msg, Retrieve, Terms, RetrievedNodes),
820 debug(paxos, 'Retrieved: ~p from 0x~16r', [Terms, RetrievedNodes]),
821 highest_vote(Terms, _Line-MajorityValue, Count),
822 debug(paxos, 'Best: ~p with ~d votes', [MajorityValue, Count]),
823 Count >= (popcount(QuorumA)+2)//2,
824 debug(paxos, 'Retrieve: accept ~p', [MajorityValue]),
825 update_failed(get, Quorum, RetrievedNodes),
826 paxos_set(Key, MajorityValue), 827 !.
828
829highest_vote(Terms, Term, Count) :-
830 msort(Terms, Sorted),
831 count_votes(Sorted, Counted),
832 sort(1, >, Counted, [Count-Term|_]).
833
834count_votes([], []).
835count_votes([H|T0], [N-H|T]) :-
836 count_same(H, T0, 1, N, R),
837 count_votes(R, T).
838
839count_same(H, [Hc|T0], C0, C, R) :-
840 H == Hc,
841 !,
842 C1 is C0+1,
843 count_same(H, T0, C1, C, R).
844count_same(_, R, C, C, R).
845
852
853paxos_key(Compound, '$c'(Name,Arity)) :-
854 compound(Compound), !,
855 compound_name_arity(Compound, Name, Arity).
856paxos_key(Compound, _) :-
857 must_be(compound, Compound).
858
859
860 863
872
873start_replicator :-
874 catch(thread_send_message(paxos_replicator, run),
875 error(existence_error(_,_),_),
876 fail),
877 !.
878start_replicator :-
879 catch(thread_create(replicator, _,
880 [ alias(paxos_replicator),
881 detached(true)
882 ]),
883 error(permission_error(_,_,_),_),
884 true).
885
886replicator :-
887 setting(replication_rate, ReplRate),
888 ReplSleep is 1/ReplRate,
889 node(Node),
890 debug(paxos(replicate), 'Starting replicator', []),
891 State = state(idle),
892 repeat,
893 quorum(Quorum),
894 dead(Dead),
895 LifeQuorum is Quorum /\ \Dead,
896 ( LifeQuorum /\ \(1<<Node) =:= 0
897 -> debug(paxos(replicate),
898 'Me: ~d, Quorum: 0x~16r, Dead: 0x~16r: I''m alone, waiting ...',
899 [Node, Quorum, Dead]),
900 thread_get_message(_)
901 ; ( paxos_replicate_key(LifeQuorum, Key, [])
902 -> replicated(State, key(Key)),
903 thread_self(Me),
904 thread_get_message(Me, _, [timeout(ReplSleep)])
905 ; replicated(State, idle),
906 thread_get_message(_)
907 )
908 ),
909 fail.
910
911replicated(State, key(_Key)) :-
912 arg(1, State, idle),
913 !,
914 debug(paxos(replicate), 'Start replicating ...', []),
915 nb_setarg(1, State, 1).
916replicated(State, key(_Key)) :-
917 !,
918 arg(1, State, C0),
919 C is C0+1,
920 nb_setarg(1, State, C).
921replicated(State, idle) :-
922 arg(1, State, idle),
923 !.
924replicated(State, idle) :-
925 arg(1, State, Count),
926 debug(paxos(replicate), 'Replicated ~D keys', [Count]),
927 nb_setarg(1, State, idle).
928
929
938
939paxos_replicate_key(Nodes, Key, Options) :-
940 replication_key(Nodes, Key),
941 option(timeout(TMO), Options, TMO),
942 apply_default(TMO, response_timeout),
943 ledger_current(Key, Gen, Value, Holders),
944 paxos_message(learn(Key,Na,Gen,Ga,Value), TMO, Learn),
945 collect(Nodes, Ga == nack, Na, Ga, Learn, _Gas, LearnedNodes),
946 NewHolders is Holders \/ LearnedNodes,
947 paxos_message(learned(Key,Gen,Value,NewHolders), -, Learned),
948 broadcast(Learned),
949 update_failed(replicate, Nodes, LearnedNodes).
950
951replication_key(_Nodes, Key) :-
952 ground(Key),
953 !.
954replication_key(Nodes, Key) :-
955 ( Nth is 1+random(popcount(Nodes))
956 ; Nth = 1
957 ),
958 call_nth(needs_replicate(Nodes, Key), Nth),
959 !.
960
961needs_replicate(Nodes, Key) :-
962 ledger_current(Key, _Gen, _Value, Holders),
963 Nodes /\ \Holders =\= 0,
964 \+ admin_key(_, Key).
965
966
967 970
986
987paxos_on_change(Term, Goal) :-
988 paxos_key(Term, Key),
989 paxos_on_change(Key, Term, Goal).
990
991paxos_on_change(Key, Value, Goal) :-
992 Goal = _:Plain,
993 must_be(callable, Plain),
994 ( Plain == ignore
995 -> unlisten(paxos_user, paxos_changed(Key,Value))
996 ; listen(paxos_user, paxos_changed(Key,Value),
997 key_changed(Key, Value, Goal)),
998 paxos_initialize
999 ).
1000
1001key_changed(_Key, _Value, Goal) :-
1002 E = error(_,_),
1003 catch(thread_create(Goal, _, [detached(true)]),
1004 E, key_error(E)).
1005
1006key_error(error(permission_error(create, thread, _), _)) :-
1007 !.
1008key_error(E) :-
1009 print_message(error, E).
1010
1011
1012 1015
1019
1023
1031
1032paxos_message(Paxos:From, TMO, Message) :-
1033 paxos_message_hook(paxos(Paxos):From, TMO, Message),
1034 !.
1035paxos_message(Paxos, TMO, Message) :-
1036 paxos_message_hook(paxos(Paxos), TMO, Message),
1037 !.
1038paxos_message(Paxos, TMO, Message) :-
1039 throw(error(mode_error(det, fail,
1040 paxos:paxos_message_hook(Paxos, TMO, Message)), _)).
1041
1042
1043 1046
1047:- dynamic
1048 paxons_ledger/4. 1049
1053
1054ledger_current(Key, Gen, Value, Holders) :-
1055 paxons_ledger(Key, Gen, Value, Holders),
1056 valid(Holders).
1057
1058
1064
1065ledger(Key, Gen, Value) :-
1066 paxons_ledger(Key, Gen, Value0, Holders),
1067 valid(Holders),
1068 !,
1069 Value = Value0.
1070
1075
1076ledger_create(Key, Gen, Value) :-
1077 get_time(Now),
1078 asserta(paxons_ledger(Key, Gen, Value, created(Now))).
1079
1084
1085ledger_update(Key, Gen, Value) :-
1086 paxons_ledger(Key, Gen0, _Value, _Holders),
1087 !,
1088 Gen > Gen0,
1089 get_time(Now),
1090 asserta(paxons_ledger(Key, Gen, Value, accepted(Now))),
1091 ( Gen0 == 0
1092 -> retractall(paxons_ledger(Key, Gen0, _, _))
1093 ; true
1094 ).
1095
1099
1100ledger_update_holders(Key, Gen, Holders) :-
1101 clause(paxons_ledger(Key, Gen, Value, Holders0), true, Ref),
1102 !,
1103 ( Holders0 == Holders
1104 -> true
1105 ; asserta(paxons_ledger(Key, Gen, Value, Holders)),
1106 erase(Ref)
1107 ),
1108 clean_key(Holders0, Key, Gen).
1109
1110clean_key(Holders, _Key, _Gen) :-
1111 valid(Holders),
1112 !.
1113clean_key(_, Key, Gen) :-
1114 ( clause(paxons_ledger(Key, Gen0, _Value, _Holders0), true, Ref),
1115 Gen0 < Gen,
1116 erase(Ref),
1117 fail
1118 ; true
1119 ).
1120
1121
1125
1126ledger_learn(Key,Gen,Value) :-
1127 paxons_ledger(Key, Gen0, Value0, _Holders),
1128 !,
1129 ( Gen == Gen0,
1130 Value == Value0
1131 -> true
1132 ; Gen > Gen0
1133 -> get_time(Now),
1134 asserta(paxons_ledger(Key, Gen, Value, learned(Now)))
1135 ).
1136ledger_learn(Key,Gen,Value) :-
1137 get_time(Now),
1138 asserta(paxons_ledger(Key, Gen, Value, learned(Now))).
1139
1144
1145ledger_forget(Nodes) :-
1146 catch(thread_create(ledger_forget_threaded(Nodes), _,
1147 [ detached(true)
1148 ]),
1149 error(permission_error(create, thread, _), _),
1150 true).
1151
1152ledger_forget_threaded(Nodes) :-
1153 debug(paxos(node), 'Forgetting 0x~16r', [Nodes]),
1154 forall(ledger_current(Key, Gen, _Value, Holders),
1155 ledger_forget(Nodes, Key, Gen, Holders)),
1156 debug(paxos(node), 'Forgotten 0x~16r', [Nodes]).
1157
1158ledger_forget(Nodes, Key, Gen, Holders) :-
1159 NewHolders is Holders /\ \Nodes,
1160 ( NewHolders \== Holders,
1161 ledger_update_holders(Key, Gen, NewHolders)
1162 -> true
1163 ; true
1164 ).
1165
1166valid(Holders) :-
1167 integer(Holders).
1168
1169
1170 1173
1179
1180c_element([New | More], _Old, New) :-
1181 forall(member(N, More), N == New),
1182 !.
1183c_element(_List, Old, Old).
1184
1189
1190arg_union(Arg, NodeStatusList, Set) :-
1191 maplist(arg(Arg), NodeStatusList, Sets),
1192 list_union(Sets, Set).
1193
1194list_union(Sets, Set) :-
1195 list_union(Sets, 0, Set).
1196
1197list_union([], Set, Set).
1198list_union([H|T], Set0, Set) :-
1199 Set1 is Set0 \/ H,
1200 list_union(T, Set1, Set)