. (utf8) 2/* Part of SWI-Prolog 3 4 Author: Torbjörn Lager and Jan Wielemaker 5 E-mail: J.Wielemaker@vu.nl 6 WWW: http://www.swi-prolog.org 7 Copyright (C): 2014-2016, Torbjörn Lager, 8 VU University Amsterdam 9 All rights reserved. 10 11 Redistribution and use in source and binary forms, with or without 12 modification, are permitted provided that the following conditions 13 are met: 14 15 1. Redistributions of source code must retain the above copyright 16 notice, this list of conditions and the following disclaimer. 17 18 2. Redistributions in binary form must reproduce the above copyright 19 notice, this list of conditions and the following disclaimer in 20 the documentation and/or other materials provided with the 21 distribution. 22 23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 26 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 27 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 28 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 29 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 30 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 31 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 32 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 33 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 POSSIBILITY OF SUCH DAMAGE. 35*/ 36 37:- module(pengines, 38 [ pengine_create/1, % +Options 39 pengine_ask/3, % +Pengine, :Query, +Options 40 pengine_next/2, % +Pengine. +Options 41 pengine_stop/2, % +Pengine. +Options 42 pengine_event/2, % -Event, +Options 43 pengine_input/2, % +Prompt, -Term 44 pengine_output/1, % +Term 45 pengine_respond/3, % +Pengine, +Input, +Options 46 pengine_debug/2, % +Format, +Args 47 pengine_self/1, % -Pengine 48 pengine_pull_response/2, % +Pengine, +Options 49 pengine_destroy/1, % +Pengine 50 pengine_destroy/2, % +Pengine, +Options 51 pengine_abort/1, % +Pengine 52 pengine_application/1, % +Application 53 current_pengine_application/1, % ?Application 54 pengine_property/2, % ?Pengine, ?Property 55 pengine_user/1, % -User 56 pengine_event_loop/2, % :Closure, +Options 57 pengine_rpc/2, % +Server, :Goal 58 pengine_rpc/3 % +Server, :Goal, +Options 59 ]).
70:- use_module(library(http/http_dispatch)). 71:- use_module(library(http/http_parameters)). 72:- use_module(library(http/http_client)). 73:- use_module(library(http/http_json)). 74:- use_module(library(http/http_open)). 75:- use_module(library(http/http_stream)). 76:- use_module(library(http/http_wrapper)). 77:- use_module(library(http/http_cors)). 78:- use_module(library(thread_pool)). 79:- use_module(library(broadcast)). 80:- use_module(library(uri)). 81:- use_module(library(filesex)). 82:- use_module(library(time)). 83:- use_module(library(lists)). 84:- use_module(library(charsio)). 85:- use_module(library(apply)). 86:- use_module(library(aggregate)). 87:- use_module(library(option)). 88:- use_module(library(settings)). 89:- use_module(library(debug)). 90:- use_module(library(error)). 91:- use_module(library(sandbox)). 92:- use_module(library(modules)). 93:- use_module(library(term_to_json)). 94:- if(exists_source(library(uuid))). 95:- use_module(library(uuid)). 96:- endif. 97 98 99:- meta_predicate 100 pengine_create( ), 101 pengine_rpc( , , ), 102 pengine_event_loop( , ). 103 104:- multifile 105 write_result/3, % +Format, +Event, +Dict 106 event_to_json/3, % +Event, -JSON, +Format 107 prepare_module/3, % +Module, +Application, +Options 108 prepare_goal/3, % +GoalIn, -GoalOut, +Options 109 authentication_hook/3, % +Request, +Application, -User 110 not_sandboxed/2. % +User, +App 111 112:- predicate_options(pengine_create/1, 1, 113 [ id(-atom), 114 alias(atom), 115 application(atom), 116 destroy(boolean), 117 server(atom), 118 ask(compound), 119 template(compound), 120 chunk(integer), 121 bindings(list), 122 src_list(list), 123 src_text(any), % text 124 src_url(atom), 125 src_predicates(list) 126 ]). 127:- predicate_options(pengine_ask/3, 3, 128 [ template(any), 129 chunk(integer), 130 bindings(list) 131 ]). 132:- predicate_options(pengine_next/2, 2, 133 [ chunk(integer), 134 pass_to(pengine_send/3, 3) 135 ]). 136:- predicate_options(pengine_stop/2, 2, 137 [ pass_to(pengine_send/3, 3) 138 ]). 139:- predicate_options(pengine_respond/3, 2, 140 [ pass_to(pengine_send/3, 3) 141 ]). 142:- predicate_options(pengine_rpc/3, 3, 143 [ chunk(integer), 144 pass_to(pengine_create/1, 1) 145 ]). 146:- predicate_options(pengine_send/3, 3, 147 [ delay(number) 148 ]). 149:- predicate_options(pengine_event/2, 2, 150 [ pass_to(thread_get_message/3, 3) 151 ]). 152:- predicate_options(pengine_pull_response/2, 2, 153 [ pass_to(http_open/3, 3) 154 ]). 155:- predicate_options(pengine_event_loop/2, 2, 156 []). % not yet implemented 157 158% :- debug(pengine(transition)). 159:- debug(pengine(debug)). % handle pengine_debug in pengine_rpc/3. 160 161goal_expansion(random_delay, Expanded) :- 162 ( debugging(pengine(delay)) 163 -> Expanded = do_random_delay 164 ; Expanded = true 165 ). 166 167do_random_delay :- 168 Delay is random(20)/1000, 169 sleep(Delay). 170 171:- meta_predicate % internal meta predicates 172 solve( , , , ), 173 findnsols_no_empty( , , , ), 174 pengine_event_loop( , , ).
Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..
Successful creation of a pengine will return an event term of the following form:
An error will be returned if the pengine could not be created:
229pengine_create(M:Options0) :-
230 translate_local_sources(Options0, Options, M),
231 ( select_option(server(BaseURL), Options, RestOptions)
232 -> remote_pengine_create(BaseURL, RestOptions)
233 ; local_pengine_create(Options)
234 ).
src_predicates
and src_list
options into
src_text
. We need to do that anyway for remote pengines. For
local pengines, we could avoid this step, but there is very
little point in transferring source to a local pengine anyway as
local pengines can access any Prolog predicate that you make
visible to the application.
Multiple sources are concatenated to end up with a single src_text option.
248translate_local_sources(OptionsIn, Options, Module) :- 249 translate_local_sources(OptionsIn, Sources, Options2, Module), 250 ( Sources == [] 251 -> Options = Options2 252 ; Sources = [Source] 253 -> Options = [src_text(Source)|Options2] 254 ; atomics_to_string(Sources, Source) 255 -> Options = [src_text(Source)|Options2] 256 ). 257 258translate_local_sources([], [], [], _). 259translate_local_sources([H0|T], [S0|S], Options, M) :- 260 nonvar(H0), 261 translate_local_source(H0, S0, M), 262 !, 263 translate_local_sources(T, S, Options, M). 264translate_local_sources([H|T0], S, [H|T], M) :- 265 translate_local_sources(T0, S, T, M). 266 267translate_local_source(src_predicates(PIs), Source, M) :- 268 must_be(list, PIs), 269 with_output_to(string(Source), 270 maplist(list_in_module(M), PIs)). 271translate_local_source(src_list(Terms), Source, _) :- 272 must_be(list, Terms), 273 with_output_to(string(Source), 274 forall(member(Term, Terms), 275 format('~k .~n', [Term]))). 276translate_local_source(src_text(Source), Source, _). 277 278list_in_module(M, PI) :- 279 listing(M:PI).
pengine_send(NameOrID, Term, [])
.
*/
286pengine_send(Target, Event) :-
287 pengine_send(Target, Event, []).
Any remaining options are passed to http_open/3. */
302pengine_send(Target, Event, Options) :- 303 must_be(atom, Target), 304 pengine_send2(Target, Event, Options). 305 306pengine_send2(self, Event, Options) :- 307 !, 308 thread_self(Queue), 309 delay_message(queue(Queue), Event, Options). 310pengine_send2(Name, Event, Options) :- 311 child(Name, Target), 312 !, 313 delay_message(pengine(Target), Event, Options). 314pengine_send2(Target, Event, Options) :- 315 delay_message(pengine(Target), Event, Options). 316 317delay_message(Target, Event, Options) :- 318 option(delay(Delay), Options), 319 !, 320 alarm(Delay, 321 send_message(Target, Event, Options), 322 _AlarmID, 323 [remove(true)]). 324delay_message(Target, Event, Options) :- 325 random_delay, 326 send_message(Target, Event, Options). 327 328send_message(queue(Queue), Event, _) :- 329 thread_send_message(Queue, pengine_request(Event)). 330send_message(pengine(Pengine), Event, Options) :- 331 ( pengine_remote(Pengine, Server) 332 -> remote_pengine_send(Server, Pengine, Event, Options) 333 ; pengine_thread(Pengine, Thread) 334 -> thread_send_message(Thread, pengine_request(Event)) 335 ; existence_error(pengine, Pengine) 336 ).
343pengine_request(Request) :-
344 pengine_self(Self),
345 get_pengine_application(Self, Application),
346 setting(Application:idle_limit, IdleLimit),
347 thread_self(Me),
348 ( thread_get_message(Me, pengine_request(Request), [timeout(IdleLimit)])
349 -> true
350 ; Request = destroy
351 ).
If the message cannot be sent within the idle_limit
setting of
the pengine, abort the pengine.
364pengine_reply(Event) :- 365 pengine_parent(Queue), 366 pengine_reply(Queue, Event). 367 368pengine_reply(_Queue, _Event0) :- 369 nb_current(pengine_idle_limit_exceeded, true), 370 !. 371pengine_reply(Queue, Event0) :- 372 arg(1, Event0, ID), 373 wrap_first_answer(ID, Event0, Event), 374 random_delay, 375 debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]), 376 ( pengine_self(ID) 377 -> get_pengine_application(ID, Application), 378 setting(Application:idle_limit, IdleLimit), 379 debug(pengine(reply), 'Sending ~p, timout: ~q', [Event, IdleLimit]), 380 ( thread_send_message(Queue, pengine_event(ID, Event), 381 [ timeout(IdleLimit) 382 ]) 383 -> true 384 ; thread_self(Me), 385 debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)', 386 [ID, Me]), 387 nb_setval(pengine_idle_limit_exceeded, true), 388 thread_detach(Me), 389 abort 390 ) 391 ; thread_send_message(Queue, pengine_event(ID, Event)) 392 ). 393 394wrap_first_answer(ID, Event0, CreateEvent) :- 395 wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]), 396 arg(1, CreateEvent, ID), 397 !, 398 retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])). 399wrap_first_answer(_ID, Event, Event). 400 401 402empty_queue :- 403 pengine_parent(Queue), 404 empty_queue(Queue, 0, Discarded), 405 debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]). 406 407empty_queue(Queue, C0, C) :- 408 thread_get_message(Queue, _Term, [timeout(0)]), 409 !, 410 C1 is C0+1, 411 empty_queue(Queue, C1, C). 412empty_queue(_, C, C).
Options is a list of options:
Name = Var
terms, providing access to the actual variable
names.Any remaining options are passed to pengine_send/3.
Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.
true
or false
, indicating whether we can expect the
pengine to be able to return more solutions or not, would we call
pengine_next/2.Defined in terms of pengine_send/3, like so:
pengine_ask(ID, Query, Options) :- partition(pengine_ask_option, Options, AskOptions, SendOptions), pengine_send(ID, ask(Query, AskOptions), SendOptions).
*/
477pengine_ask(ID, Query, Options) :- 478 partition(pengine_ask_option, Options, AskOptions, SendOptions), 479 pengine_send(ID, ask(Query, AskOptions), SendOptions). 480 481 482pengine_ask_option(template(_)). 483pengine_ask_option(chunk(_)). 484pengine_ask_option(bindings(_)). 485pengine_ask_option(breakpoints(_)).
Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.
Defined in terms of pengine_send/3, as follows:
pengine_next(ID, Options) :- pengine_send(ID, next, Options).
*/
529pengine_next(ID, Options) :- 530 select_option(chunk(Count), Options, Options1), 531 !, 532 pengine_send(ID, next(Count), Options1). 533pengine_next(ID, Options) :- 534 pengine_send(ID, next, Options).
Defined in terms of pengine_send/3, like so:
pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
*/
550pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
561pengine_abort(Name) :-
562 ( child(Name, Pengine)
563 -> true
564 ; Pengine = Name
565 ),
566 ( pengine_remote(Pengine, Server)
567 -> remote_pengine_abort(Server, Pengine, [])
568 ; pengine_thread(Pengine, Thread),
569 debug(pengine(abort), 'Signalling thread ~p', [Thread]),
570 catch(thread_signal(Thread, throw(abort_query)), _, true)
571 ).
force(true)
, the pengine
is killed using abort/0 and pengine_destroy/2 succeeds.
*/581pengine_destroy(ID) :- 582 pengine_destroy(ID, []). 583 584pengine_destroy(Name, Options) :- 585 ( child(Name, ID) 586 -> true 587 ; ID = Name 588 ), 589 option(force(true), Options), 590 !, 591 ( pengine_thread(ID, Thread) 592 -> catch(thread_signal(Thread, abort), 593 error(existence_error(thread, _), _), true) 594 ; true 595 ). 596pengine_destroy(ID, _) :- 597 catch(pengine_send(ID, destroy), 598 error(existence_error(pengine, ID), _), 599 retractall(child(_,ID))). 600 601 602/*================= pengines administration ======================= 603*/
thread(ThreadId)
remote(URL)
614:- dynamic 615 current_pengine/6, % Id, ParentId, Thread, URL, App, Destroy 616 pengine_queue/4, % Id, Queue, TimeOut, Time 617 output_queue/3, % Id, Queue, Time 618 pengine_user/2, % Id, User 619 pengine_data/2. % Id, Data 620:- volatile 621 current_pengine/6, 622 pengine_queue/4, 623 output_queue/3, 624 pengine_user/2, 625 pengine_data/2. 626 627:- thread_local 628 child/2. % ?Name, ?Child
634pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :- 635 asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)). 636 637pengine_register_remote(Id, URL, Application, Destroy) :- 638 thread_self(Queue), 639 asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
http
and the queue is the
message queue used to send events to the HTTP workers.647pengine_unregister(Id) :- 648 thread_self(Me), 649 ( current_pengine(Id, Queue, Me, http, _, _) 650 -> with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue)) 651 ; true 652 ), 653 retractall(current_pengine(Id, _, Me, _, _, _)), 654 retractall(pengine_user(Id, _)), 655 retractall(pengine_data(Id, _)). 656 657pengine_unregister_remote(Id) :- 658 retractall(current_pengine(Id, _Parent, 0, _, _, _)).
664pengine_self(Id) :- 665 thread_self(Thread), 666 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy). 667 668pengine_parent(Parent) :- 669 nb_getval(pengine_parent, Parent). 670 671pengine_thread(Pengine, Thread) :- 672 current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy), 673 Thread \== 0, 674 !. 675 676pengine_remote(Pengine, URL) :- 677 current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy). 678 679get_pengine_application(Pengine, Application) :- 680 current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy), 681 !. 682 683get_pengine_module(Pengine, Pengine). 684 685:- if(current_predicate(uuid/2)). 686pengine_uuid(Id) :- 687 uuid(Id, [version(4)]). % Version 4 is random. 688:- else. 689:- use_module(library(random)). 690pengine_uuid(Id) :- 691 Max is 1<<128, 692 random_between(0, Max, Num), 693 atom_number(Id, Num). 694:- endif.
pengine_sandbox
. The example below creates a new application
address_book
and imports the API defined in the module file
adress_book_api.pl
into the application.
:- pengine_application(address_book). :- use_module(address_book:adress_book_api).
*/
710pengine_application(Application) :- 711 throw(error(context_error(nodirective, 712 pengine_application(Application)), _)). 713 714:- multifile 715 system:term_expansion/2, 716 current_application/1.
724current_pengine_application(Application) :- 725 current_application(Application). 726 727 728% Default settings for all applications 729 730:- setting(thread_pool_size, integer, 100, 731 'Maximum number of pengines this application can run.'). 732:- setting(thread_pool_stacks, list(compound), [], 733 'Maximum stack sizes for pengines this application can run.'). 734:- setting(slave_limit, integer, 3, 735 'Maximum number of slave pengines a master pengine can create.'). 736:- setting(time_limit, number, 300, 737 'Maximum time to wait for output'). 738:- setting(idle_limit, number, 300, 739 'Pengine auto-destroys when idle for this time'). 740:- setting(safe_goal_limit, number, 10, 741 'Maximum time to try proving safety of the goal'). 742:- setting(program_space, integer, 100_000_000, 743 'Maximum memory used by predicates'). 744:- setting(allow_from, list(atom), [*], 745 'IP addresses from which remotes are allowed to connect'). 746:- setting(deny_from, list(atom), [], 747 'IP addresses from which remotes are NOT allowed to connect'). 748:- setting(debug_info, boolean, false, 749 'Keep information to support source-level debugging'). 750 751 752systemterm_expansion((:- pengine_application(Application)), Expanded) :- 753 must_be(atom, Application), 754 ( module_property(Application, file(_)) 755 -> permission_error(create, pengine_application, Application) 756 ; true 757 ), 758 expand_term((:- setting(Application:thread_pool_size, integer, 759 setting(pengines:thread_pool_size), 760 'Maximum number of pengines this \c 761 application can run.')), 762 ThreadPoolSizeSetting), 763 expand_term((:- setting(Application:thread_pool_stacks, list(compound), 764 setting(pengines:thread_pool_stacks), 765 'Maximum stack sizes for pengines \c 766 this application can run.')), 767 ThreadPoolStacksSetting), 768 expand_term((:- setting(Application:slave_limit, integer, 769 setting(pengines:slave_limit), 770 'Maximum number of local slave pengines \c 771 a master pengine can create.')), 772 SlaveLimitSetting), 773 expand_term((:- setting(Application:time_limit, number, 774 setting(pengines:time_limit), 775 'Maximum time to wait for output')), 776 TimeLimitSetting), 777 expand_term((:- setting(Application:idle_limit, number, 778 setting(pengines:idle_limit), 779 'Pengine auto-destroys when idle for this time')), 780 IdleLimitSetting), 781 expand_term((:- setting(Application:safe_goal_limit, number, 782 setting(pengines:safe_goal_limit), 783 'Maximum time to try proving safety of the goal')), 784 SafeGoalLimitSetting), 785 expand_term((:- setting(Application:program_space, integer, 786 setting(pengines:program_space), 787 'Maximum memory used by predicates')), 788 ProgramSpaceSetting), 789 expand_term((:- setting(Application:allow_from, list(atom), 790 setting(pengines:allow_from), 791 'IP addresses from which remotes are allowed \c 792 to connect')), 793 AllowFromSetting), 794 expand_term((:- setting(Application:deny_from, list(atom), 795 setting(pengines:deny_from), 796 'IP addresses from which remotes are NOT \c 797 allowed to connect')), 798 DenyFromSetting), 799 expand_term((:- setting(Application:debug_info, boolean, 800 setting(pengines:debug_info), 801 'Keep information to support source-level \c 802 debugging')), 803 DebugInfoSetting), 804 flatten([ pengines:current_application(Application), 805 ThreadPoolSizeSetting, 806 ThreadPoolStacksSetting, 807 SlaveLimitSetting, 808 TimeLimitSetting, 809 IdleLimitSetting, 810 SafeGoalLimitSetting, 811 ProgramSpaceSetting, 812 AllowFromSetting, 813 DenyFromSetting, 814 DebugInfoSetting 815 ], Expanded). 816 817% Register default application 818 819:- pengine_application(pengine_sandbox).
alias
option when creating the pengine.true
if the pengines is destroyed automatically
after completing the query.debug_info
is present.
*/854pengine_property(Id, Prop) :- 855 nonvar(Id), nonvar(Prop), 856 pengine_property2(Id, Prop), 857 !. 858pengine_property(Id, Prop) :- 859 pengine_property2(Prop, Id). 860 861pengine_property2(self(Id), Id) :- 862 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 863pengine_property2(module(Id), Id) :- 864 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 865pengine_property2(alias(Alias), Id) :- 866 child(Alias, Id), 867 Alias \== Id. 868pengine_property2(thread(Thread), Id) :- 869 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy), 870 Thread \== 0. 871pengine_property2(remote(Server), Id) :- 872 current_pengine(Id, _Parent, 0, Server, _Application, _Destroy). 873pengine_property2(application(Application), Id) :- 874 current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy). 875pengine_property2(destroy(Destroy), Id) :- 876 current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy). 877pengine_property2(parent(Parent), Id) :- 878 current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy). 879pengine_property2(source(SourceID, Source), Id) :- 880 pengine_data(Id, source(SourceID, Source)).
887pengine_output(Term) :-
888 pengine_self(Me),
889 pengine_reply(output(Me, Term)).
console.log(Message)
if there is a console. The predicate
pengine_rpc/3 calls debug(pengine(debug), '~w', [Message])
. The debug
topic pengine(debug)
is enabled by default.
904pengine_debug(Format, Args) :- 905 pengine_parent(Queue), 906 pengine_self(Self), 907 catch(safe_goal(format(atom(_), Format, Args)), E, true), 908 ( var(E) 909 -> format(atom(Message), Format, Args) 910 ; message_to_string(E, Message) 911 ), 912 pengine_reply(Queue, debug(Self, Message)). 913 914 915/*================= Local pengine ======================= 916*/
927local_pengine_create(Options) :-
928 thread_self(Self),
929 option(application(Application), Options, pengine_sandbox),
930 create(Self, Child, Options, local, Application),
931 option(alias(Name), Options, Child),
932 assert(child(Name, Child)).
939thread_poolcreate_pool(Application) :-
940 current_application(Application),
941 setting(Application:thread_pool_size, Size),
942 setting(Application:thread_pool_stacks, Stacks),
943 thread_pool_create(Application, Size, Stacks).
953create(Queue, Child, Options, local, Application) :- 954 !, 955 pengine_child_id(Child), 956 create0(Queue, Child, Options, local, Application). 957create(Queue, Child, Options, URL, Application) :- 958 pengine_child_id(Child), 959 catch(create0(Queue, Child, Options, URL, Application), 960 Error, 961 create_error(Queue, Child, Error)). 962 963pengine_child_id(Child) :- 964 ( nonvar(Child) 965 -> true 966 ; pengine_uuid(Child) 967 ). 968 969create_error(Queue, Child, Error) :- 970 pengine_reply(Queue, error(Child, Error)). 971 972create0(Queue, Child, Options, URL, Application) :- 973 ( current_application(Application) 974 -> true 975 ; existence_error(pengine_application, Application) 976 ), 977 ( URL \== http % pengine is _not_ a child of the 978 % HTTP server thread 979 -> aggregate_all(count, child(_,_), Count), 980 setting(Application:slave_limit, Max), 981 ( Count >= Max 982 -> throw(error(resource_error(max_pengines), _)) 983 ; true 984 ) 985 ; true 986 ), 987 partition(pengine_create_option, Options, PengineOptions, RestOptions), 988 thread_create_in_pool( 989 Application, 990 pengine_main(Queue, PengineOptions, Application), ChildThread, 991 [ at_exit(pengine_done) 992 | RestOptions 993 ]), 994 option(destroy(Destroy), PengineOptions, true), 995 pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy), 996 thread_send_message(ChildThread, pengine_registered(Child)), 997 ( option(id(Id), Options) 998 -> Id = Child 999 ; true 1000 ). 1001 1002pengine_create_option(src_text(_)). 1003pengine_create_option(src_url(_)). 1004pengine_create_option(application(_)). 1005pengine_create_option(destroy(_)). 1006pengine_create_option(ask(_)). 1007pengine_create_option(template(_)). 1008pengine_create_option(bindings(_)). 1009pengine_create_option(chunk(_)). 1010pengine_create_option(alias(_)). 1011pengine_create_option(user(_)).
at_exit
option. Destroys
child pengines using pengine_destroy/1.1019:- public 1020 pengine_done/0. 1021 1022pengine_done :- 1023 thread_self(Me), 1024 ( thread_property(Me, status(exception('$aborted'))), 1025 thread_detach(Me), 1026 pengine_self(Pengine) 1027 -> catch(pengine_reply(destroy(Pengine, abort(Pengine))), 1028 error(_,_), true) 1029 ; true 1030 ), 1031 forall(child(_Name, Child), 1032 pengine_destroy(Child)), 1033 pengine_self(Id), 1034 pengine_unregister(Id).
1042:- thread_local wrap_first_answer_in_create_event/2. 1043 1044:- meta_predicate 1045 pengine_prepare_source( , ). 1046 1047pengine_main(Parent, Options, Application) :- 1048 fix_streams, 1049 thread_get_message(pengine_registered(Self)), 1050 nb_setval(pengine_parent, Parent), 1051 pengine_register_user(Options), 1052 set_prolog_flag(mitigate_spectre, true), 1053 catch(in_temporary_module( 1054 Self, 1055 pengine_prepare_source(Application, Options), 1056 pengine_create_and_loop(Self, Application, Options)), 1057 prepare_source_failed, 1058 pengine_terminate(Self)). 1059 1060pengine_create_and_loop(Self, Application, Options) :- 1061 setting(Application:slave_limit, SlaveLimit), 1062 CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]), 1063 ( option(ask(Query0), Options) 1064 -> asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)), 1065 ( string(Query0) % string is not callable 1066 -> ( option(template(TemplateS), Options) 1067 -> Ask2 = Query0-TemplateS 1068 ; Ask2 = Query0 1069 ), 1070 catch(ask_to_term(Ask2, Self, Query, Template, Bindings), 1071 Error, true), 1072 ( var(Error) 1073 -> true 1074 ; send_error(Error), 1075 throw(prepare_source_failed) 1076 ) 1077 ; Query = Query0, 1078 option(template(Template), Options, Query), 1079 option(bindings(Bindings), Options, []) 1080 ), 1081 option(chunk(Chunk), Options, 1), 1082 pengine_ask(Self, Query, 1083 [ template(Template), 1084 chunk(Chunk), 1085 bindings(Bindings) 1086 ]) 1087 ; Extra = [], 1088 pengine_reply(CreateEvent) 1089 ), 1090 pengine_main_loop(Self).
1100ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :- 1101 !, 1102 format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]), 1103 term_string(t(Template1,Ask1), AskTemplate, 1104 [ variable_names(Bindings0), 1105 module(Module) 1106 ]), 1107 phrase(template_bindings(Template1, Bindings0), Bindings). 1108ask_to_term(Ask, Module, Ask1, Template, Bindings1) :- 1109 term_string(Ask1, Ask, 1110 [ variable_names(Bindings), 1111 module(Module) 1112 ]), 1113 exclude(anon, Bindings, Bindings1), 1114 dict_create(Template, swish_default_template, Bindings1). 1115 1116template_bindings(Var, Bindings) --> 1117 { var(Var) }, !, 1118 ( { var_binding(Bindings, Var, Binding) 1119 } 1120 -> [Binding] 1121 ; [] 1122 ). 1123template_bindings([H|T], Bindings) --> 1124 !, 1125 template_bindings(H, Bindings), 1126 template_bindings(T, Bindings). 1127template_bindings(Compoound, Bindings) --> 1128 { compound(Compoound), !, 1129 compound_name_arguments(Compoound, _, Args) 1130 }, 1131 template_bindings(Args, Bindings). 1132template_bindings(_, _) --> []. 1133 1134var_binding(Bindings, Var, Binding) :- 1135 member(Binding, Bindings), 1136 arg(2, Binding, V), 1137 V == Var, !.
1144fix_streams :- 1145 fix_stream(current_output). 1146 1147fix_stream(Name) :- 1148 is_cgi_stream(Name), 1149 !, 1150 debug(pengine(stream), '~w is a CGI stream!', [Name]), 1151 set_stream(user_output, alias(Name)). 1152fix_stream(_).
1161pengine_prepare_source(Module:Application, Options) :- 1162 setting(Application:program_space, SpaceLimit), 1163 set_module(Module:program_space(SpaceLimit)), 1164 delete_import_module(Module, user), 1165 add_import_module(Module, Application, start), 1166 catch(prep_module(Module, Application, Options), Error, true), 1167 ( var(Error) 1168 -> true 1169 ; send_error(Error), 1170 throw(prepare_source_failed) 1171 ). 1172 1173prep_module(Module, Application, Options) :- 1174 maplist(copy_flag(Module, Application), [var_prefix]), 1175 forall(prepare_module(Module, Application, Options), true), 1176 setup_call_cleanup( 1177 '$set_source_module'(OldModule, Module), 1178 maplist(process_create_option(Module), Options), 1179 '$set_source_module'(OldModule)). 1180 1181copy_flag(Module, Application, Flag) :- 1182 current_prolog_flag(ApplicationFlag, Value), 1183 !, 1184 set_prolog_flag(ModuleFlag, Value). 1185copy_flag(_, _, _). 1186 1187process_create_option(Application, src_text(Text)) :- 1188 !, 1189 pengine_src_text(Text, Application). 1190process_create_option(Application, src_url(URL)) :- 1191 !, 1192 pengine_src_url(URL, Application). 1193process_create_option(_, _).
src_text
and
src_url
options1216pengine_main_loop(ID) :- 1217 catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)). 1218 1219pengine_aborted(ID) :- 1220 thread_self(Self), 1221 debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]), 1222 empty_queue, 1223 destroy_or_continue(abort(ID)).
1236guarded_main_loop(ID) :- 1237 pengine_request(Request), 1238 ( Request = destroy 1239 -> debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]), 1240 pengine_terminate(ID) 1241 ; Request = ask(Goal, Options) 1242 -> debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]), 1243 ask(ID, Goal, Options) 1244 ; debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]), 1245 pengine_reply(error(ID, error(protocol_error, _))), 1246 guarded_main_loop(ID) 1247 ). 1248 1249 1250pengine_terminate(ID) :- 1251 pengine_reply(destroy(ID)), 1252 thread_self(Me), % Make the thread silently disappear 1253 thread_detach(Me).
1264solve(Chunk, Template, Goal, ID) :- 1265 prolog_current_choice(Choice), 1266 State = count(Chunk), 1267 statistics(cputime, Epoch), 1268 Time = time(Epoch), 1269 nb_current('$variable_names', Bindings), 1270 filter_template(Template, Bindings, Template2), 1271 '$current_typein_module'(CurrTypeIn), 1272 ( '$set_typein_module'(ID), 1273 call_cleanup(catch(findnsols_no_empty(State, Template2, 1274 set_projection(Goal, Bindings), 1275 Result), 1276 Error, true), 1277 query_done(Det, CurrTypeIn)), 1278 arg(1, Time, T0), 1279 statistics(cputime, T1), 1280 CPUTime is T1-T0, 1281 ( var(Error) 1282 -> projection(Projection), 1283 ( var(Det) 1284 -> pengine_reply(success(ID, Result, Projection, 1285 CPUTime, true)), 1286 more_solutions(ID, Choice, State, Time) 1287 ; !, % commit 1288 destroy_or_continue(success(ID, Result, Projection, 1289 CPUTime, false)) 1290 ) 1291 ; !, % commit 1292 ( Error == abort_query 1293 -> throw(Error) 1294 ; destroy_or_continue(error(ID, Error)) 1295 ) 1296 ) 1297 ; !, % commit 1298 arg(1, Time, T0), 1299 statistics(cputime, T1), 1300 CPUTime is T1-T0, 1301 destroy_or_continue(failure(ID, CPUTime)) 1302 ). 1303solve(_, _, _, _). % leave a choice point 1304 1305query_done(true, CurrTypeIn) :- 1306 '$set_typein_module'(CurrTypeIn).
1315set_projection(Goal, Bindings) :- 1316 b_setval('$variable_names', Bindings), 1317 call(Goal). 1318 1319projection(Projection) :- 1320 nb_current('$variable_names', Bindings), 1321 !, 1322 maplist(var_name, Bindings, Projection). 1323projection([]).
1333filter_template(Template0, Bindings, Template) :- 1334 is_dict(Template0, swish_default_template), 1335 !, 1336 dict_create(Template, swish_default_template, Bindings). 1337filter_template(Template, _Bindings, Template). 1338 1339findnsols_no_empty(N, Template, Goal, List) :- 1340 findnsols(N, Template, Goal, List), 1341 List \== []. 1342 1343destroy_or_continue(Event) :- 1344 arg(1, Event, ID), 1345 ( pengine_property(ID, destroy(true)) 1346 -> thread_self(Me), 1347 thread_detach(Me), 1348 pengine_reply(destroy(ID, Event)) 1349 ; pengine_reply(Event), 1350 garbage_collect, % minimise our footprint 1351 trim_stacks, 1352 guarded_main_loop(ID) 1353 ).
chunk
solutions.next
, but sets the new chunk-size to Count.1371more_solutions(ID, Choice, State, Time) :- 1372 pengine_request(Event), 1373 more_solutions(Event, ID, Choice, State, Time). 1374 1375more_solutions(stop, ID, _Choice, _State, _Time) :- 1376 !, 1377 debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]), 1378 destroy_or_continue(stop(ID)). 1379more_solutions(next, ID, _Choice, _State, Time) :- 1380 !, 1381 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]), 1382 statistics(cputime, T0), 1383 nb_setarg(1, Time, T0), 1384 fail. 1385more_solutions(next(Count), ID, _Choice, State, Time) :- 1386 Count > 0, 1387 !, 1388 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]), 1389 nb_setarg(1, State, Count), 1390 statistics(cputime, T0), 1391 nb_setarg(1, Time, T0), 1392 fail. 1393more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :- 1394 !, 1395 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]), 1396 prolog_cut_to(Choice), 1397 ask(ID, Goal, Options). 1398more_solutions(destroy, ID, _Choice, _State, _Time) :- 1399 !, 1400 debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]), 1401 pengine_terminate(ID). 1402more_solutions(Event, ID, Choice, State, Time) :- 1403 debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]), 1404 pengine_reply(error(ID, error(protocol_error, _))), 1405 more_solutions(ID, Choice, State, Time).
chunk(N)
option.
1413ask(ID, Goal, Options) :-
1414 catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1415 !,
1416 ( var(Error)
1417 -> option(template(Template), Options, Goal),
1418 option(chunk(N), Options, 1),
1419 solve(N, Template, Goal1, ID)
1420 ; pengine_reply(error(ID, Error)),
1421 guarded_main_loop(ID)
1422 ).
Note that expand_goal(Module:GoalIn, GoalOut)
is what we'd like
to write, but this does not work correctly if the user wishes to
expand X:Y
while interpreting X not as the module in which
to run Y. This happens in the CQL package. Possibly we should
disallow this reinterpretation?
1436prepare_goal(ID, Goal0, Module:Goal, Options) :-
1437 option(bindings(Bindings), Options, []),
1438 b_setval('$variable_names', Bindings),
1439 ( prepare_goal(Goal0, Goal1, Options)
1440 -> true
1441 ; Goal1 = Goal0
1442 ),
1443 get_pengine_module(ID, Module),
1444 setup_call_cleanup(
1445 '$set_source_module'(Old, Module),
1446 expand_goal(Goal1, Goal),
1447 '$set_source_module'(_, Old)),
1448 ( pengine_not_sandboxed(ID)
1449 -> true
1450 ; get_pengine_application(ID, App),
1451 setting(App:safe_goal_limit, Limit),
1452 catch(call_with_time_limit(
1453 Limit,
1454 safe_goal(Module:Goal)), E, true)
1455 -> ( var(E)
1456 -> true
1457 ; E = time_limit_exceeded
1458 -> throw(error(sandbox(time_limit_exceeded, Limit),_))
1459 ; throw(E)
1460 )
1461 ).
not_sandboxed(User, Application)
must succeed.
1481pengine_not_sandboxed(ID) :-
1482 pengine_user(ID, User),
1483 pengine_property(ID, application(App)),
1484 not_sandboxed(User, App),
1485 !.
1507pengine_pull_response(Pengine, Options) :- 1508 pengine_remote(Pengine, Server), 1509 !, 1510 remote_pengine_pull_response(Server, Pengine, Options). 1511pengine_pull_response(_ID, _Options).
1520pengine_input(Prompt, Term) :-
1521 pengine_self(Self),
1522 pengine_parent(Parent),
1523 pengine_reply(Parent, prompt(Self, Prompt)),
1524 pengine_request(Request),
1525 ( Request = input(Input)
1526 -> Term = Input
1527 ; Request == destroy
1528 -> abort
1529 ; throw(error(protocol_error,_))
1530 ).
Defined in terms of pengine_send/3, as follows:
pengine_respond(Pengine, Input, Options) :- pengine_send(Pengine, input(Input), Options).
*/
1547pengine_respond(Pengine, Input, Options) :-
1548 pengine_send(Pengine, input(Input), Options).
1557send_error(error(Formal, context(prolog_stack(Frames), Message))) :- 1558 is_list(Frames), 1559 !, 1560 with_output_to(string(Stack), 1561 print_prolog_backtrace(current_output, Frames)), 1562 pengine_self(Self), 1563 replace_blobs(Formal, Formal1), 1564 replace_blobs(Message, Message1), 1565 pengine_reply(error(Self, error(Formal1, 1566 context(prolog_stack(Stack), Message1)))). 1567send_error(Error) :- 1568 pengine_self(Self), 1569 replace_blobs(Error, Error1), 1570 pengine_reply(error(Self, Error1)).
1578replace_blobs(Blob, Atom) :- 1579 blob(Blob, Type), Type \== text, 1580 !, 1581 format(atom(Atom), '~p', [Blob]). 1582replace_blobs(Term0, Term) :- 1583 compound(Term0), 1584 !, 1585 compound_name_arguments(Term0, Name, Args0), 1586 maplist(replace_blobs, Args0, Args), 1587 compound_name_arguments(Term, Name, Args). 1588replace_blobs(Term, Term). 1589 1590 1591/*================= Remote pengines ======================= 1592*/ 1593 1594 1595remote_pengine_create(BaseURL, Options) :- 1596 partition(pengine_create_option, Options, PengineOptions0, RestOptions), 1597 ( option(ask(Query), PengineOptions0), 1598 \+ option(template(_Template), PengineOptions0) 1599 -> PengineOptions = [template(Query)|PengineOptions0] 1600 ; PengineOptions = PengineOptions0 1601 ), 1602 options_to_dict(PengineOptions, PostData), 1603 remote_post_rec(BaseURL, create, PostData, Reply, RestOptions), 1604 arg(1, Reply, ID), 1605 ( option(id(ID2), Options) 1606 -> ID = ID2 1607 ; true 1608 ), 1609 option(alias(Name), Options, ID), 1610 assert(child(Name, ID)), 1611 ( ( functor(Reply, create, _) % actually created 1612 ; functor(Reply, output, _) % compiler messages 1613 ) 1614 -> option(application(Application), PengineOptions, pengine_sandbox), 1615 option(destroy(Destroy), PengineOptions, true), 1616 pengine_register_remote(ID, BaseURL, Application, Destroy) 1617 ; true 1618 ), 1619 thread_self(Queue), 1620 pengine_reply(Queue, Reply). 1621 1622options_to_dict(Options, Dict) :- 1623 select_option(ask(Ask), Options, Options1), 1624 select_option(template(Template), Options1, Options2), 1625 !, 1626 no_numbered_var_in(Ask+Template), 1627 findall(AskString-TemplateString, 1628 ask_template_to_strings(Ask, Template, AskString, TemplateString), 1629 [ AskString-TemplateString ]), 1630 options_to_dict(Options2, Dict0), 1631 Dict = Dict0.put(_{ask:AskString,template:TemplateString}). 1632options_to_dict(Options, Dict) :- 1633 maplist(prolog_option, Options, Options1), 1634 dict_create(Dict, _, Options1). 1635 1636no_numbered_var_in(Term) :- 1637 sub_term(Sub, Term), 1638 subsumes_term('$VAR'(_), Sub), 1639 !, 1640 domain_error(numbered_vars_free_term, Term). 1641no_numbered_var_in(_). 1642 1643ask_template_to_strings(Ask, Template, AskString, TemplateString) :- 1644 numbervars(Ask+Template, 0, _), 1645 WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ], 1646 format(string(AskTemplate), '~W\n~W', [ Ask, WOpts, 1647 Template, WOpts 1648 ]), 1649 split_string(AskTemplate, "\n", "", [AskString, TemplateString]). 1650 1651prolog_option(Option0, Option) :- 1652 create_option_type(Option0, term), 1653 !, 1654 Option0 =.. [Name,Value], 1655 format(string(String), '~k', [Value]), 1656 Option =.. [Name,String]. 1657prolog_option(Option, Option). 1658 1659create_option_type(ask(_), term). 1660create_option_type(template(_), term). 1661create_option_type(application(_), atom). 1662 1663remote_pengine_send(BaseURL, ID, Event, Options) :- 1664 remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options), 1665 thread_self(Queue), 1666 pengine_reply(Queue, Reply). 1667 1668remote_pengine_pull_response(BaseURL, ID, Options) :- 1669 remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options), 1670 thread_self(Queue), 1671 pengine_reply(Queue, Reply). 1672 1673remote_pengine_abort(BaseURL, ID, Options) :- 1674 remote_send_rec(BaseURL, abort, ID, [], Reply, Options), 1675 thread_self(Queue), 1676 pengine_reply(Queue, Reply).
1683remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :- 1684 !, 1685 server_url(Server, Action, [id=ID], URL), 1686 http_open(URL, Stream, % putting this in setup_call_cleanup/3 1687 [ post(prolog(Event)) % makes it impossible to interrupt. 1688 | Options 1689 ]), 1690 call_cleanup( 1691 read_prolog_reply(Stream, Reply), 1692 close(Stream)). 1693remote_send_rec(Server, Action, ID, Params, Reply, Options) :- 1694 server_url(Server, Action, [id=ID|Params], URL), 1695 http_open(URL, Stream, Options), 1696 call_cleanup( 1697 read_prolog_reply(Stream, Reply), 1698 close(Stream)). 1699 1700remote_post_rec(Server, Action, Data, Reply, Options) :- 1701 server_url(Server, Action, [], URL), 1702 probe(Action, URL), 1703 http_open(URL, Stream, 1704 [ post(json(Data)) 1705 | Options 1706 ]), 1707 call_cleanup( 1708 read_prolog_reply(Stream, Reply), 1709 close(Stream)).
1717probe(create, URL) :- 1718 !, 1719 http_open(URL, Stream, [method(options)]), 1720 close(Stream). 1721probe(_, _). 1722 1723read_prolog_reply(In, Reply) :- 1724 set_stream(In, encoding(utf8)), 1725 read(In, Reply0), 1726 rebind_cycles(Reply0, Reply). 1727 1728rebind_cycles(@(Reply, Bindings), Reply) :- 1729 is_list(Bindings), 1730 !, 1731 maplist(bind, Bindings). 1732rebind_cycles(Reply, Reply). 1733 1734bind(Var = Value) :- 1735 Var = Value. 1736 1737server_url(Server, Action, Params, URL) :- 1738 uri_components(Server, Components0), 1739 uri_query_components(Query, Params), 1740 uri_data(path, Components0, Path0), 1741 atom_concat('pengine/', Action, PAction), 1742 directory_file_path(Path0, PAction, Path), 1743 uri_data(path, Components0, Path, Components), 1744 uri_data(search, Components, Query), 1745 uri_components(URL, Components).
Valid options are:
timeout
.1766pengine_event(Event) :- 1767 pengine_event(Event, []). 1768 1769pengine_event(Event, Options) :- 1770 thread_self(Self), 1771 option(listen(Id), Options, _), 1772 ( thread_get_message(Self, pengine_event(Id, Event), Options) 1773 -> true 1774 ; Event = timeout 1775 ), 1776 update_remote_destroy(Event). 1777 1778update_remote_destroy(Event) :- 1779 destroy_event(Event), 1780 arg(1, Event, Id), 1781 pengine_remote(Id, _Server), 1782 !, 1783 pengine_unregister_remote(Id). 1784update_remote_destroy(_). 1785 1786destroy_event(destroy(_)). 1787destroy_event(destroy(_,_)). 1788destroy_event(create(_,Features)) :- 1789 memberchk(answer(Answer), Features), 1790 !, 1791 nonvar(Answer), 1792 destroy_event(Answer).
ignore(call(Closure, E))
. A
closure thus acts as a handler for the event. Some events are also
treated specially:
Valid options are:
all
,
all_but_sender
or a Prolog list of NameOrIDs. [not yet
implemented]*/
1821pengine_event_loop(Closure, Options) :- 1822 child(_,_), 1823 !, 1824 pengine_event(Event), 1825 ( option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs 1826 -> forall(child(_,ID), pengine_send(ID, Event)) 1827 ; true 1828 ), 1829 pengine_event_loop(Event, Closure, Options). 1830pengine_event_loop(_, _). 1831 1832:- meta_predicate 1833 pengine_process_event( , , , ). 1834 1835pengine_event_loop(Event, Closure, Options) :- 1836 pengine_process_event(Event, Closure, Continue, Options), 1837 ( Continue == true 1838 -> pengine_event_loop(Closure, Options) 1839 ; true 1840 ). 1841 1842pengine_process_event(create(ID, T), Closure, Continue, Options) :- 1843 debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]), 1844 ( select(answer(First), T, T1) 1845 -> ignore(call(Closure, create(ID, T1))), 1846 pengine_process_event(First, Closure, Continue, Options) 1847 ; ignore(call(Closure, create(ID, T))), 1848 Continue = true 1849 ). 1850pengine_process_event(output(ID, Msg), Closure, true, _Options) :- 1851 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]), 1852 ignore(call(Closure, output(ID, Msg))), 1853 pengine_pull_response(ID, []). 1854pengine_process_event(debug(ID, Msg), Closure, true, _Options) :- 1855 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]), 1856 ignore(call(Closure, debug(ID, Msg))), 1857 pengine_pull_response(ID, []). 1858pengine_process_event(prompt(ID, Term), Closure, true, _Options) :- 1859 debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]), 1860 ignore(call(Closure, prompt(ID, Term))). 1861pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :- 1862 debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]), 1863 ignore(call(Closure, success(ID, Sol, More))). 1864pengine_process_event(failure(ID, _Time), Closure, true, _Options) :- 1865 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]), 1866 ignore(call(Closure, failure(ID))). 1867pengine_process_event(error(ID, Error), Closure, Continue, _Options) :- 1868 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]), 1869 ( call(Closure, error(ID, Error)) 1870 -> Continue = true 1871 ; forall(child(_,Child), pengine_destroy(Child)), 1872 throw(Error) 1873 ). 1874pengine_process_event(stop(ID), Closure, true, _Options) :- 1875 debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]), 1876 ignore(call(Closure, stop(ID))). 1877pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :- 1878 pengine_process_event(Event, Closure, _, Options), 1879 pengine_process_event(destroy(ID), Closure, Continue, Options). 1880pengine_process_event(destroy(ID), Closure, true, _Options) :- 1881 retractall(child(_,ID)), 1882 debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]), 1883 ignore(call(Closure, destroy(ID))).
copy_term_nat(Query, Copy), % attributes are not copied to the server call(Copy), % executed on server at URL Query = Copy.
Valid options are:
pengines:time_limit
.Remaining options (except the server option) are passed to pengine_create/1. */
1912pengine_rpc(URL, Query) :- 1913 pengine_rpc(URL, Query, []). 1914 1915pengine_rpc(URL, Query, M:Options0) :- 1916 translate_local_sources(Options0, Options1, M), 1917 ( option(timeout(_), Options1) 1918 -> Options = Options1 1919 ; setting(time_limit, Limit), 1920 Options = [timeout(Limit)|Options1] 1921 ), 1922 term_variables(Query, Vars), 1923 Template =.. [v|Vars], 1924 State = destroy(true), % modified by process_event/4 1925 setup_call_catcher_cleanup( 1926 pengine_create([ ask(Query), 1927 template(Template), 1928 server(URL), 1929 id(Id) 1930 | Options 1931 ]), 1932 wait_event(Template, State, [listen(Id)|Options]), 1933 Why, 1934 pengine_destroy_and_wait(State, Id, Why)). 1935 1936pengine_destroy_and_wait(destroy(true), Id, Why) :- 1937 !, 1938 debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]), 1939 pengine_destroy(Id), 1940 wait_destroy(Id, 10). 1941pengine_destroy_and_wait(_, _, Why) :- 1942 debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]). 1943 1944wait_destroy(Id, _) :- 1945 \+ child(_, Id), 1946 !. 1947wait_destroy(Id, N) :- 1948 pengine_event(Event, [listen(Id),timeout(10)]), 1949 !, 1950 ( destroy_event(Event) 1951 -> retractall(child(_,Id)) 1952 ; succ(N1, N) 1953 -> wait_destroy(Id, N1) 1954 ; debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]), 1955 pengine_unregister_remote(Id), 1956 retractall(child(_,Id)) 1957 ). 1958 1959wait_event(Template, State, Options) :- 1960 pengine_event(Event, Options), 1961 debug(pengine(event), 'Received ~p', [Event]), 1962 process_event(Event, Template, State, Options). 1963 1964process_event(create(_ID, Features), Template, State, Options) :- 1965 memberchk(answer(First), Features), 1966 process_event(First, Template, State, Options). 1967process_event(error(_ID, Error), _Template, _, _Options) :- 1968 throw(Error). 1969process_event(failure(_ID, _Time), _Template, _, _Options) :- 1970 fail. 1971process_event(prompt(ID, Prompt), Template, State, Options) :- 1972 pengine_rpc_prompt(ID, Prompt, Reply), 1973 pengine_send(ID, input(Reply)), 1974 wait_event(Template, State, Options). 1975process_event(output(ID, Term), Template, State, Options) :- 1976 pengine_rpc_output(ID, Term), 1977 pengine_pull_response(ID, Options), 1978 wait_event(Template, State, Options). 1979process_event(debug(ID, Message), Template, State, Options) :- 1980 debug(pengine(debug), '~w', [Message]), 1981 pengine_pull_response(ID, Options), 1982 wait_event(Template, State, Options). 1983process_event(success(_ID, Solutions, _Proj, _Time, false), 1984 Template, _, _Options) :- 1985 !, 1986 member(Template, Solutions). 1987process_event(success(ID, Solutions, _Proj, _Time, true), 1988 Template, State, Options) :- 1989 ( member(Template, Solutions) 1990 ; pengine_next(ID, Options), 1991 wait_event(Template, State, Options) 1992 ). 1993process_event(destroy(ID, Event), Template, State, Options) :- 1994 !, 1995 retractall(child(_,ID)), 1996 nb_setarg(1, State, false), 1997 debug(pengine(destroy), 'State: ~p~n', [State]), 1998 process_event(Event, Template, State, Options). 1999% compatibility with older versions of the protocol. 2000process_event(success(ID, Solutions, Time, More), 2001 Template, State, Options) :- 2002 process_event(success(ID, Solutions, _Proj, Time, More), 2003 Template, State, Options). 2004 2005 2006pengine_rpc_prompt(ID, Prompt, Term) :- 2007 prompt(ID, Prompt, Term0), 2008 !, 2009 Term = Term0. 2010pengine_rpc_prompt(_ID, Prompt, Term) :- 2011 setup_call_cleanup( 2012 prompt(Old, Prompt), 2013 read(Term), 2014 prompt(_, Old)). 2015 2016pengine_rpc_output(ID, Term) :- 2017 output(ID, Term), 2018 !. 2019pengine_rpc_output(_ID, Term) :- 2020 print(Term).
2027:- multifile prompt/3.
2034:- multifile output/2. 2035 2036 2037/*================= HTTP handlers ======================= 2038*/ 2039 2040% Declare HTTP locations we serve and how. Note that we use 2041% time_limit(inifinite) because pengines have their own timeout. Also 2042% note that we use spawn. This is needed because we can easily get 2043% many clients waiting for some action on a pengine to complete. 2044% Without spawning, we would quickly exhaust the worker pool of the 2045% HTTP server. 2046% 2047% FIXME: probably we should wait for a short time for the pengine on 2048% the default worker thread. Only if that time has expired, we can 2049% call http_spawn/2 to continue waiting on a new thread. That would 2050% improve the performance and reduce the usage of threads. 2051 2052:- http_handler(root(pengine), http_404([]), 2053 [ id(pengines) ]). 2054:- http_handler(root(pengine/create), http_pengine_create, 2055 [ time_limit(infinite), spawn([]) ]). 2056:- http_handler(root(pengine/send), http_pengine_send, 2057 [ time_limit(infinite), spawn([]) ]). 2058:- http_handler(root(pengine/pull_response), http_pengine_pull_response, 2059 [ time_limit(infinite), spawn([]) ]). 2060:- http_handler(root(pengine/abort), http_pengine_abort, []). 2061:- http_handler(root(pengine/ping), http_pengine_ping, []). 2062:- http_handler(root(pengine/destroy_all), http_pengine_destroy_all, []). 2063 2064:- http_handler(root(pengine/'pengines.js'), 2065 http_reply_file(library('http/web/js/pengines.js'), []), []). 2066:- http_handler(root(pengine/'plterm.css'), 2067 http_reply_file(library('http/web/css/plterm.css'), []), []).
application/json
and as
www-form-encoded
. Accepted parameters:
Parameter | Default | Comment |
format | prolog | Output format |
application | pengine_sandbox | Pengine application |
chunk | 1 | Chunk-size for results |
solutions | chunked | If all , emit all results |
ask | - | The query |
template | - | Output template |
src_text | "" | Program |
src_url | - | Program to download |
disposition | - | Download location |
Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.
2094http_pengine_create(Request) :- 2095 reply_options(Request, [post]), 2096 !. 2097http_pengine_create(Request) :- 2098 memberchk(content_type(CT), Request), 2099 sub_atom(CT, 0, _, _, 'application/json'), 2100 !, 2101 http_read_json_dict(Request, Dict), 2102 dict_atom_option(format, Dict, Format, prolog), 2103 dict_atom_option(application, Dict, Application, pengine_sandbox), 2104 http_pengine_create(Request, Application, Format, Dict). 2105http_pengine_create(Request) :- 2106 Optional = [optional(true)], 2107 OptString = [string|Optional], 2108 Form = [ format(Format, [default(prolog)]), 2109 application(Application, [default(pengine_sandbox)]), 2110 chunk(_, [integer, default(1)]), 2111 solutions(_, [oneof([all,chunked]), default(chunked)]), 2112 ask(_, OptString), 2113 template(_, OptString), 2114 src_text(_, OptString), 2115 disposition(_, OptString), 2116 src_url(_, Optional) 2117 ], 2118 http_parameters(Request, Form), 2119 form_dict(Form, Dict), 2120 http_pengine_create(Request, Application, Format, Dict). 2121 2122dict_atom_option(Key, Dict, Atom, Default) :- 2123 ( get_dict(Key, Dict, String) 2124 -> atom_string(Atom, String) 2125 ; Atom = Default 2126 ). 2127 2128form_dict(Form, Dict) :- 2129 form_values(Form, Pairs), 2130 dict_pairs(Dict, _, Pairs). 2131 2132form_values([], []). 2133form_values([H|T], Pairs) :- 2134 arg(1, H, Value), 2135 nonvar(Value), 2136 !, 2137 functor(H, Name, _), 2138 Pairs = [Name-Value|PairsT], 2139 form_values(T, PairsT). 2140form_values([_|T], Pairs) :- 2141 form_values(T, Pairs).
2146http_pengine_create(Request, Application, Format, Dict) :- 2147 current_application(Application), 2148 !, 2149 allowed(Request, Application), 2150 authenticate(Request, Application, UserOptions), 2151 dict_to_options(Dict, Application, CreateOptions0), 2152 append(UserOptions, CreateOptions0, CreateOptions), 2153 pengine_uuid(Pengine), 2154 message_queue_create(Queue, [max_size(25)]), 2155 setting(Application:time_limit, TimeLimit), 2156 get_time(Now), 2157 asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)), 2158 broadcast(pengine(create(Pengine, Application, CreateOptions))), 2159 create(Queue, Pengine, CreateOptions, http, Application), 2160 create_wait_and_output_result(Pengine, Queue, Format, 2161 TimeLimit, Dict), 2162 gc_abandoned_queues. 2163http_pengine_create(_Request, Application, Format, _Dict) :- 2164 Error = existence_error(pengine_application, Application), 2165 pengine_uuid(ID), 2166 output_result(Format, error(ID, error(Error, _))). 2167 2168 2169dict_to_options(Dict, Application, CreateOptions) :- 2170 dict_pairs(Dict, _, Pairs), 2171 pairs_create_options(Pairs, Application, CreateOptions). 2172 2173pairs_create_options([], _, []) :- !. 2174pairs_create_options([N-V0|T0], App, [Opt|T]) :- 2175 Opt =.. [N,V], 2176 pengine_create_option(Opt), N \== user, 2177 !, 2178 ( create_option_type(Opt, atom) 2179 -> atom_string(V, V0) % term creation must be done if 2180 ; V = V0 % we created the source and know 2181 ), % the operators. 2182 pairs_create_options(T0, App, T). 2183pairs_create_options([_|T0], App, T) :- 2184 pairs_create_options(T0, App, T).
time_limit
,
Pengine is aborted and the result is error(time_limit_exceeded,
_)
.
2195wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
2196 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2197 [ timeout(TimeLimit)
2198 ]),
2199 Error, true)
2200 -> ( var(Error)
2201 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2202 ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2203 output_result(Format, Event)
2204 ; output_result(Format, died(Pengine))
2205 )
2206 ; time_limit_exceeded(Pengine, Format)
2207 ).
disposition
key to denote the
download location.2216create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2217 get_dict(solutions, Dict, all), 2218 !, 2219 between(1, infinite, Page), 2220 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2221 [ timeout(TimeLimit) 2222 ]), 2223 Error, true) 2224 -> ( var(Error) 2225 -> debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]), 2226 ( destroy_queue_from_http(Pengine, Event, Queue) 2227 -> !, output_result(Format, page(Page, Event)) 2228 ; is_more_event(Event) 2229 -> pengine_thread(Pengine, Thread), 2230 thread_send_message(Thread, pengine_request(next)), 2231 output_result(Format, page(Page, Event), Dict), 2232 fail 2233 ; !, output_result(Format, page(Page, Event), Dict) 2234 ) 2235 ; !, output_result(Format, died(Pengine)) 2236 ) 2237 ; !, time_limit_exceeded(Pengine, Format) 2238 ), 2239 !. 2240create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, _Dict) :- 2241 wait_and_output_result(Pengine, Queue, Format, TimeLimit). 2242 2243is_more_event(success(_Id, _Answers, _Projection, _Time, true)). 2244is_more_event(create(_, Options)) :- 2245 memberchk(answer(Event), Options), 2246 is_more_event(Event).
2260time_limit_exceeded(Pengine, Format) :-
2261 call_cleanup(
2262 pengine_destroy(Pengine, [force(true)]),
2263 output_result(Format,
2264 destroy(Pengine,
2265 error(Pengine, time_limit_exceeded)))).
2280destroy_queue_from_http(ID, _, Queue) :- 2281 output_queue(ID, Queue, _), 2282 !, 2283 destroy_queue_if_empty(Queue). 2284destroy_queue_from_http(ID, Event, Queue) :- 2285 debug(pengine(destroy), 'DESTROY? ~p', [Event]), 2286 is_destroy_event(Event), 2287 !, 2288 message_queue_property(Queue, size(Waiting)), 2289 debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]), 2290 with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)). 2291 2292is_destroy_event(destroy(_)). 2293is_destroy_event(destroy(_,_)). 2294is_destroy_event(create(_, Options)) :- 2295 memberchk(answer(Event), Options), 2296 is_destroy_event(Event). 2297 2298destroy_queue_if_empty(Queue) :- 2299 thread_peek_message(Queue, _), 2300 !. 2301destroy_queue_if_empty(Queue) :- 2302 retractall(output_queue(_, Queue, _)), 2303 message_queue_destroy(Queue).
2311:- dynamic 2312 last_gc/1. 2313 2314gc_abandoned_queues :- 2315 consider_queue_gc, 2316 !, 2317 get_time(Now), 2318 ( output_queue(_, Queue, Time), 2319 Now-Time > 15*60, 2320 retract(output_queue(_, Queue, Time)), 2321 message_queue_destroy(Queue), 2322 fail 2323 ; retractall(last_gc(_)), 2324 asserta(last_gc(Now)) 2325 ). 2326gc_abandoned_queues. 2327 2328consider_queue_gc :- 2329 predicate_property(output_queue(_,_,_), number_of_clauses(N)), 2330 N > 100, 2331 ( last_gc(Time), 2332 get_time(Now), 2333 Now-Time > 5*60 2334 -> true 2335 ; \+ last_gc(_) 2336 ).
2354:- dynamic output_queue_destroyed/1. 2355 2356sync_destroy_queue_from_http(ID, Queue) :- 2357 ( output_queue(ID, Queue, _) 2358 -> destroy_queue_if_empty(Queue) 2359 ; thread_peek_message(Queue, pengine_event(_, output(_,_))) 2360 -> debug(pengine(destroy), 'Delay destruction of ~p because of output', 2361 [Queue]), 2362 get_time(Now), 2363 asserta(output_queue(ID, Queue, Now)) 2364 ; message_queue_destroy(Queue), 2365 asserta(output_queue_destroyed(Queue)) 2366 ).
pengine
held.2373sync_destroy_queue_from_pengine(ID, Queue) :- 2374 ( retract(output_queue_destroyed(Queue)) 2375 -> true 2376 ; get_time(Now), 2377 asserta(output_queue(ID, Queue, Now)) 2378 ), 2379 retractall(pengine_queue(ID, Queue, _, _)). 2380 2381 2382http_pengine_send(Request) :- 2383 reply_options(Request, [get,post]), 2384 !. 2385http_pengine_send(Request) :- 2386 http_parameters(Request, 2387 [ id(ID, [ type(atom) ]), 2388 event(EventString, [optional(true)]), 2389 format(Format, [default(prolog)]) 2390 ]), 2391 get_pengine_module(ID, Module), 2392 ( current_module(Module) % avoid re-creating the module 2393 -> catch(( read_event(Request, EventString, Module, Event0, Bindings), 2394 fix_bindings(Format, Event0, Bindings, Event1) 2395 ), 2396 Error, 2397 true), 2398 ( var(Error) 2399 -> debug(pengine(event), 'HTTP send: ~p', [Event1]), 2400 ( pengine_thread(ID, Thread) 2401 -> pengine_queue(ID, Queue, TimeLimit, _), 2402 random_delay, 2403 broadcast(pengine(send(ID, Event1))), 2404 thread_send_message(Thread, pengine_request(Event1)), 2405 wait_and_output_result(ID, Queue, Format, TimeLimit) 2406 ; atom(ID) 2407 -> pengine_died(Format, ID) 2408 ; http_404([], Request) 2409 ) 2410 ; output_result(Format, error(ID, Error)) 2411 ) 2412 ; debug(pengine(event), 'Pengine module ~q vanished', [Module]), 2413 discard_post_data(Request), 2414 pengine_died(Format, ID) 2415 ). 2416 2417pengine_died(Format, Pengine) :- 2418 output_result(Format, error(Pengine, 2419 error(existence_error(pengine, Pengine),_))).
event
parameter or as a posted document.2427read_event(_Request, EventString, Module, Event, Bindings) :- 2428 nonvar(EventString), 2429 !, 2430 term_string(Event, EventString, 2431 [ variable_names(Bindings), 2432 module(Module) 2433 ]). 2434read_event(Request, _EventString, Module, Event, Bindings) :- 2435 option(method(post), Request), 2436 http_read_data(Request, Event, 2437 [ content_type('application/x-prolog'), 2438 module(Module), 2439 variable_names(Bindings) 2440 ]).
2446discard_post_data(Request) :- 2447 option(method(post), Request), 2448 !, 2449 setup_call_cleanup( 2450 open_null_stream(NULL), 2451 http_read_data(Request, _, [to(stream(NULL))]), 2452 close(NULL)). 2453discard_post_data(_).
json(-s)
Format from the variables in
the asked Goal. Variables starting with an underscore, followed
by an capital letter are ignored from the template.2461fix_bindings(Format, 2462 ask(Goal, Options0), Bindings, 2463 ask(Goal, NewOptions)) :- 2464 json_lang(Format), 2465 !, 2466 exclude(anon, Bindings, NamedBindings), 2467 template(NamedBindings, Template, Options0, Options1), 2468 select_option(chunk(Paging), Options1, Options2, 1), 2469 NewOptions = [ template(Template), 2470 chunk(Paging), 2471 bindings(NamedBindings) 2472 | Options2 2473 ]. 2474fix_bindings(_, Command, _, Command). 2475 2476template(_, Template, Options0, Options) :- 2477 select_option(template(Template), Options0, Options), 2478 !. 2479template(Bindings, Template, Options, Options) :- 2480 dict_create(Template, swish_default_template, Bindings). 2481 2482anon(Name=_) :- 2483 sub_atom(Name, 0, _, _, '_'), 2484 sub_atom(Name, 1, 1, _, Next), 2485 char_type(Next, prolog_var_start). 2486 2487var_name(Name=_, Name).
2494json_lang(json) :- !. 2495json_lang(Format) :- 2496 sub_atom(Format, 0, _, _, 'json-').
2503http_pengine_pull_response(Request) :- 2504 reply_options(Request, [get]), 2505 !. 2506http_pengine_pull_response(Request) :- 2507 http_parameters(Request, 2508 [ id(ID, []), 2509 format(Format, [default(prolog)]) 2510 ]), 2511 ( ( pengine_queue(ID, Queue, TimeLimit, _) 2512 -> true 2513 ; output_queue(ID, Queue, _), 2514 TimeLimit = 0 2515 ) 2516 -> wait_and_output_result(ID, Queue, Format, TimeLimit) 2517 ; http_404([], Request) 2518 ).
2527http_pengine_abort(Request) :- 2528 reply_options(Request, [get]), 2529 !. 2530http_pengine_abort(Request) :- 2531 http_parameters(Request, 2532 [ id(ID, []), 2533 format(Format, [default(prolog)]) 2534 ]), 2535 ( pengine_thread(ID, _Thread), 2536 pengine_queue(ID, Queue, TimeLimit, _) 2537 -> broadcast(pengine(abort(ID))), 2538 abort_pending_output(ID), 2539 pengine_abort(ID), 2540 wait_and_output_result(ID, Queue, Format, TimeLimit) 2541 ; http_404([], Request) 2542 ). 2543 2544http_pengine_destroy_all(Request) :- 2545 reply_options(Request, [get]), 2546 !. 2547http_pengine_destroy_all(Request) :- 2548 http_parameters(Request, 2549 [ ids(IDsAtom, []) 2550 ]), 2551 atomic_list_concat(IDs, ',', IDsAtom), 2552 forall(member(ID, IDs), 2553 pengine_destroy(ID, [force(true)])), 2554 reply_json("ok").
status(Pengine, Stats)
is created, where Stats
is the return of thread_statistics/2.2562http_pengine_ping(Request) :- 2563 reply_options(Request, [get]), 2564 !. 2565http_pengine_ping(Request) :- 2566 http_parameters(Request, 2567 [ id(Pengine, []), 2568 format(Format, [default(prolog)]) 2569 ]), 2570 ( pengine_thread(Pengine, Thread), 2571 catch(thread_statistics(Thread, Stats), _, fail) 2572 -> output_result(Format, ping(Pengine, Stats)) 2573 ; output_result(Format, died(Pengine)) 2574 ).
prolog
, json
or json-s
.2583:- dynamic 2584 pengine_replying/2. % +Pengine, +Thread 2585 2586output_result(Format, Event) :- 2587 arg(1, Event, Pengine), 2588 thread_self(Thread), 2589 setup_call_cleanup( 2590 asserta(pengine_replying(Pengine, Thread), Ref), 2591 catch(output_result(Format, Event, _{}), 2592 pengine_abort_output, 2593 true), 2594 erase(Ref)). 2595 2596output_result(prolog, Event, _) :- 2597 !, 2598 format('Content-type: text/x-prolog; charset=UTF-8~n~n'), 2599 write_term(Event, 2600 [ quoted(true), 2601 ignore_ops(true), 2602 fullstop(true), 2603 blobs(portray), 2604 portray_goal(portray_blob), 2605 nl(true) 2606 ]). 2607output_result(Lang, Event, Dict) :- 2608 write_result(Lang, Event, Dict), 2609 !. 2610output_result(Lang, Event, _) :- 2611 json_lang(Lang), 2612 !, 2613 ( event_term_to_json_data(Event, JSON, Lang) 2614 -> cors_enable, 2615 disable_client_cache, 2616 reply_json(JSON) 2617 ; assertion(event_term_to_json_data(Event, _, Lang)) 2618 ). 2619output_result(Lang, _Event, _) :- % FIXME: allow for non-JSON format 2620 domain_error(pengine_format, Lang).
'$BLOB'(Type)
.
Future versions may include more info, depending on Type.2630:- public portray_blob/2. % called from write-term 2631portray_blob(Blob, _Options) :- 2632 blob(Blob, Type), 2633 writeq('$BLOB'(Type)).
2640abort_pending_output(Pengine) :- 2641 forall(pengine_replying(Pengine, Thread), 2642 abort_output_thread(Thread)). 2643 2644abort_output_thread(Thread) :- 2645 catch(thread_signal(Thread, throw(pengine_abort_output)), 2646 error(existence_error(thread, _), _), 2647 true).
prolog
and various JSON dialects. The hook
event_to_json/3 can be used to refine the JSON dialects. This
hook must be used if a completely different output format is
desired.2663disable_client_cache :- 2664 format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c 2665 Pragma: no-cache\r\n\c 2666 Expires: 0\r\n'). 2667 2668event_term_to_json_data(Event, JSON, Lang) :- 2669 event_to_json(Event, JSON, Lang), 2670 !. 2671event_term_to_json_data(success(ID, Bindings0, Projection, Time, More), 2672 json{event:success, id:ID, time:Time, 2673 data:Bindings, more:More, projection:Projection}, 2674 json) :- 2675 !, 2676 term_to_json(Bindings0, Bindings). 2677event_term_to_json_data(destroy(ID, Event), 2678 json{event:destroy, id:ID, data:JSON}, 2679 Style) :- 2680 !, 2681 event_term_to_json_data(Event, JSON, Style). 2682event_term_to_json_data(create(ID, Features0), JSON, Style) :- 2683 !, 2684 ( select(answer(First0), Features0, Features1) 2685 -> event_term_to_json_data(First0, First, Style), 2686 Features = [answer(First)|Features1] 2687 ; Features = Features0 2688 ), 2689 dict_create(JSON, json, [event(create), id(ID)|Features]). 2690event_term_to_json_data(destroy(ID, Event), 2691 json{event:destroy, id:ID, data:JSON}, Style) :- 2692 !, 2693 event_term_to_json_data(Event, JSON, Style). 2694event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :- 2695 !, 2696 Error0 = json{event:error, id:ID, data:Message}, 2697 add_error_details(ErrorTerm, Error0, Error), 2698 message_to_string(ErrorTerm, Message). 2699event_term_to_json_data(failure(ID, Time), 2700 json{event:failure, id:ID, time:Time}, _) :- 2701 !. 2702event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :- 2703 functor(EventTerm, F, 1), 2704 !, 2705 arg(1, EventTerm, ID). 2706event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :- 2707 functor(EventTerm, F, 2), 2708 arg(1, EventTerm, ID), 2709 arg(2, EventTerm, Data), 2710 term_to_json(Data, JSON). 2711 2712:- public add_error_details/3.
pengines_io.pl
.
2719add_error_details(Error, JSON0, JSON) :-
2720 add_error_code(Error, JSON0, JSON1),
2721 add_error_location(Error, JSON1, JSON).
code
field to JSON0 of Error is an ISO error term. The error
code is the functor name of the formal part of the error, e.g.,
syntax_error
, type_error
, etc. Some errors carry more
information:
2734add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :- 2735 atom(Type), 2736 !, 2737 to_atomic(Obj, Value), 2738 Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}). 2739add_error_code(error(Formal, _), Error0, Error) :- 2740 callable(Formal), 2741 !, 2742 functor(Formal, Code, _), 2743 Error = Error0.put(code, Code). 2744add_error_code(_, Error, Error). 2745 2746% What to do with large integers? 2747to_atomic(Obj, Atomic) :- atom(Obj), !, Atomic = Obj. 2748to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj. 2749to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj. 2750to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
location
property if the error can be associated with a
source location. The location is an object with properties file
and line
and, if available, the character location in the line.2759add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :- 2760 atom(Path), integer(Line), 2761 !, 2762 Term = Term0.put(_{location:_{file:Path, line:Line}}). 2763add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :- 2764 atom(Path), integer(Line), integer(Ch), 2765 !, 2766 Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}). 2767add_error_location(_, Term, Term).
success(ID, Bindings, Projection, Time, More)
and output(ID,
Term)
into a format suitable for processing at the client side.2778%:- multifile pengines:event_to_json/3. 2779 2780 2781 /******************************* 2782 * ACCESS CONTROL * 2783 *******************************/
forbidden
header if contact is not allowed.2790allowed(Request, Application) :- 2791 setting(Application:allow_from, Allow), 2792 match_peer(Request, Allow), 2793 setting(Application:deny_from, Deny), 2794 \+ match_peer(Request, Deny), 2795 !. 2796allowed(Request, _Application) :- 2797 memberchk(request_uri(Here), Request), 2798 throw(http_reply(forbidden(Here))). 2799 2800match_peer(_, Allowed) :- 2801 memberchk(*, Allowed), 2802 !. 2803match_peer(_, []) :- !, fail. 2804match_peer(Request, Allowed) :- 2805 http_peer(Request, Peer), 2806 debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]), 2807 ( memberchk(Peer, Allowed) 2808 -> true 2809 ; member(Pattern, Allowed), 2810 match_peer_pattern(Pattern, Peer) 2811 ). 2812 2813match_peer_pattern(Pattern, Peer) :- 2814 ip_term(Pattern, IP), 2815 ip_term(Peer, IP), 2816 !. 2817 2818ip_term(Peer, Pattern) :- 2819 split_string(Peer, ".", "", PartStrings), 2820 ip_pattern(PartStrings, Pattern). 2821 2822ip_pattern([], []). 2823ip_pattern([*], _) :- !. 2824ip_pattern([S|T0], [N|T]) :- 2825 number_string(N, S), 2826 ip_pattern(T0, T).
[user(User)]
, []
or
an exception.2834authenticate(Request, Application, UserOptions) :- 2835 authentication_hook(Request, Application, User), 2836 !, 2837 must_be(ground, User), 2838 UserOptions = [user(User)]. 2839authenticate(_, _, []).
throw(http_reply(authorise(basic(Realm))))
Start a normal HTTP login challenge (reply 401)throw(http_reply(forbidden(Path)))
)
Reject the request using a 403 repply.2861pengine_register_user(Options) :- 2862 option(user(User), Options), 2863 !, 2864 pengine_self(Me), 2865 asserta(pengine_user(Me, User)). 2866pengine_register_user(_).
2877pengine_user(User) :-
2878 pengine_self(Me),
2879 pengine_user(Me, User).
2885reply_options(Request, Allowed) :- 2886 option(method(options), Request), 2887 !, 2888 cors_enable(Request, 2889 [ methods(Allowed) 2890 ]), 2891 format('Content-type: text/plain\r\n'), 2892 format('~n'). % empty body 2893 2894 2895 /******************************* 2896 * COMPILE SOURCE * 2897 *******************************/
2906pengine_src_text(Src, Module) :- 2907 pengine_self(Self), 2908 format(atom(ID), 'pengine://~w/src', [Self]), 2909 extra_load_options(Self, Options), 2910 setup_call_cleanup( 2911 open_chars_stream(Src, Stream), 2912 load_files(Module:ID, 2913 [ stream(Stream), 2914 module(Module), 2915 silent(true) 2916 | Options 2917 ]), 2918 close(Stream)), 2919 keep_source(Self, ID, Src). 2920 2921system'#file'(File, _Line) :- 2922 prolog_load_context(stream, Stream), 2923 set_stream(Stream, file_name(File)), 2924 set_stream(Stream, record_position(false)), 2925 set_stream(Stream, record_position(true)).
2935pengine_src_url(URL, Module) :- 2936 pengine_self(Self), 2937 uri_encoded(path, URL, Path), 2938 format(atom(ID), 'pengine://~w/url/~w', [Self, Path]), 2939 extra_load_options(Self, Options), 2940 ( get_pengine_application(Self, Application), 2941 setting(Application:debug_info, false) 2942 -> setup_call_cleanup( 2943 http_open(URL, Stream, []), 2944 ( set_stream(Stream, encoding(utf8)), 2945 load_files(Module:ID, 2946 [ stream(Stream), 2947 module(Module) 2948 | Options 2949 ]) 2950 ), 2951 close(Stream)) 2952 ; setup_call_cleanup( 2953 http_open(URL, TempStream, []), 2954 ( set_stream(TempStream, encoding(utf8)), 2955 read_string(TempStream, _, Src) 2956 ), 2957 close(TempStream)), 2958 setup_call_cleanup( 2959 open_chars_stream(Src, Stream), 2960 load_files(Module:ID, 2961 [ stream(Stream), 2962 module(Module) 2963 | Options 2964 ]), 2965 close(Stream)), 2966 keep_source(Self, ID, Src) 2967 ). 2968 2969 2970extra_load_options(Pengine, Options) :- 2971 pengine_not_sandboxed(Pengine), 2972 !, 2973 Options = []. 2974extra_load_options(_, [sandboxed(true)]). 2975 2976 2977keep_source(Pengine, ID, SrcText) :- 2978 get_pengine_application(Pengine, Application), 2979 setting(Application:debug_info, true), 2980 !, 2981 to_string(SrcText, SrcString), 2982 assertz(pengine_data(Pengine, source(ID, SrcString))). 2983keep_source(_, _, _). 2984 2985to_string(String, String) :- 2986 string(String), 2987 !. 2988to_string(Atom, String) :- 2989 atom_string(Atom, String), 2990 !. 2991 2992 /******************************* 2993 * SANDBOX * 2994 *******************************/ 2995 2996:- multifile 2997 sandbox:safe_primitive/1. 2998 2999sandbox:safe_primitive(pengines:pengine_input(_, _)). 3000sandbox:safe_primitive(pengines:pengine_output(_)). 3001sandbox:safe_primitive(pengines:pengine_debug(_,_)). 3002 3003 3004 /******************************* 3005 * MESSAGES * 3006 *******************************/ 3007 3008prologerror_message(sandbox(time_limit_exceeded, Limit)) --> 3009 [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl, 3010 'This is normally caused by an insufficiently instantiated'-[], nl, 3011 'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl, 3012 'find all possible instantations of Var.'-[] 3013 ]
Pengines: Web Logic Programming Made Easy
The
library(pengines)
provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.