View source with formatted comments or as raw
    1:- encoding(utf8).
    2/*  Part of SWI-Prolog
    3
    4    Author:        Torbjörn Lager and Jan Wielemaker
    5    E-mail:        J.Wielemaker@vu.nl
    6    WWW:           http://www.swi-prolog.org
    7    Copyright (C): 2014-2016, Torbjörn Lager,
    8                              VU University Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(pengines,
   38          [ pengine_create/1,                   % +Options
   39            pengine_ask/3,                      % +Pengine, :Query, +Options
   40            pengine_next/2,                     % +Pengine. +Options
   41            pengine_stop/2,                     % +Pengine. +Options
   42            pengine_event/2,                    % -Event, +Options
   43            pengine_input/2,                    % +Prompt, -Term
   44            pengine_output/1,                   % +Term
   45            pengine_respond/3,                  % +Pengine, +Input, +Options
   46            pengine_debug/2,                    % +Format, +Args
   47            pengine_self/1,                     % -Pengine
   48            pengine_pull_response/2,            % +Pengine, +Options
   49            pengine_destroy/1,                  % +Pengine
   50            pengine_destroy/2,                  % +Pengine, +Options
   51            pengine_abort/1,                    % +Pengine
   52            pengine_application/1,              % +Application
   53            current_pengine_application/1,      % ?Application
   54            pengine_property/2,                 % ?Pengine, ?Property
   55            pengine_user/1,                     % -User
   56            pengine_event_loop/2,               % :Closure, +Options
   57            pengine_rpc/2,                      % +Server, :Goal
   58            pengine_rpc/3                       % +Server, :Goal, +Options
   59          ]).   60
   61/** <module> Pengines: Web Logic Programming Made Easy
   62
   63The library(pengines) provides an  infrastructure   for  creating Prolog
   64engines in a (remote) pengine server  and accessing these engines either
   65from Prolog or JavaScript.
   66
   67@author Torbjörn Lager and Jan Wielemaker
   68*/
   69
   70:- use_module(library(http/http_dispatch)).   71:- use_module(library(http/http_parameters)).   72:- use_module(library(http/http_client)).   73:- use_module(library(http/http_json)).   74:- use_module(library(http/http_open)).   75:- use_module(library(http/http_stream)).   76:- use_module(library(http/http_wrapper)).   77:- use_module(library(http/http_cors)).   78:- use_module(library(thread_pool)).   79:- use_module(library(broadcast)).   80:- use_module(library(uri)).   81:- use_module(library(filesex)).   82:- use_module(library(time)).   83:- use_module(library(lists)).   84:- use_module(library(charsio)).   85:- use_module(library(apply)).   86:- use_module(library(aggregate)).   87:- use_module(library(option)).   88:- use_module(library(settings)).   89:- use_module(library(debug)).   90:- use_module(library(error)).   91:- use_module(library(sandbox)).   92:- use_module(library(modules)).   93:- use_module(library(term_to_json)).   94:- if(exists_source(library(uuid))).   95:- use_module(library(uuid)).   96:- endif.   97
   98
   99:- meta_predicate
  100    pengine_create(:),
  101    pengine_rpc(+, +, :),
  102    pengine_event_loop(1, +).  103
  104:- multifile
  105    write_result/3,                 % +Format, +Event, +Dict
  106    event_to_json/3,                % +Event, -JSON, +Format
  107    prepare_module/3,               % +Module, +Application, +Options
  108    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
  109    authentication_hook/3,          % +Request, +Application, -User
  110    not_sandboxed/2.                % +User, +App
  111
  112:- predicate_options(pengine_create/1, 1,
  113                     [ id(-atom),
  114                       alias(atom),
  115                       application(atom),
  116                       destroy(boolean),
  117                       server(atom),
  118                       ask(compound),
  119                       template(compound),
  120                       chunk(integer),
  121                       bindings(list),
  122                       src_list(list),
  123                       src_text(any),           % text
  124                       src_url(atom),
  125                       src_predicates(list)
  126                     ]).  127:- predicate_options(pengine_ask/3, 3,
  128                     [ template(any),
  129                       chunk(integer),
  130                       bindings(list)
  131                     ]).  132:- predicate_options(pengine_next/2, 2,
  133                     [ chunk(integer),
  134                       pass_to(pengine_send/3, 3)
  135                     ]).  136:- predicate_options(pengine_stop/2, 2,
  137                     [ pass_to(pengine_send/3, 3)
  138                     ]).  139:- predicate_options(pengine_respond/3, 2,
  140                     [ pass_to(pengine_send/3, 3)
  141                     ]).  142:- predicate_options(pengine_rpc/3, 3,
  143                     [ chunk(integer),
  144                       pass_to(pengine_create/1, 1)
  145                     ]).  146:- predicate_options(pengine_send/3, 3,
  147                     [ delay(number)
  148                     ]).  149:- predicate_options(pengine_event/2, 2,
  150                     [ pass_to(thread_get_message/3, 3)
  151                     ]).  152:- predicate_options(pengine_pull_response/2, 2,
  153                     [ pass_to(http_open/3, 3)
  154                     ]).  155:- predicate_options(pengine_event_loop/2, 2,
  156                     []).                       % not yet implemented
  157
  158% :- debug(pengine(transition)).
  159:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
  160
  161goal_expansion(random_delay, Expanded) :-
  162    (   debugging(pengine(delay))
  163    ->  Expanded = do_random_delay
  164    ;   Expanded = true
  165    ).
  166
  167do_random_delay :-
  168    Delay is random(20)/1000,
  169    sleep(Delay).
  170
  171:- meta_predicate                       % internal meta predicates
  172    solve(+, ?, 0, +),
  173    findnsols_no_empty(+, ?, 0, -),
  174    pengine_event_loop(+, 1, +).  175
  176/**  pengine_create(:Options) is det.
  177
  178    Creates a new pengine. Valid options are:
  179
  180    * id(-ID)
  181      ID gets instantiated to the id of the created pengine.  ID is
  182      atomic.
  183
  184    * alias(+Name)
  185      The pengine is named Name (an atom). A slave pengine (child) can
  186      subsequently be referred to by this name.
  187
  188    * application(+Application)
  189      Application in which the pengine runs.  See pengine_application/1.
  190
  191    * server(+URL)
  192      The pengine will run in (and in the Prolog context of) the pengine
  193      server located at URL.
  194
  195    * src_list(+List_of_clauses)
  196      Inject a list of Prolog clauses into the pengine.
  197
  198    * src_text(+Atom_or_string)
  199      Inject the clauses specified by a source text into the pengine.
  200
  201    * src_url(+URL)
  202      Inject the clauses specified in the file located at URL into the
  203      pengine.
  204
  205    * src_predicates(+List)
  206      Send the local predicates denoted by List to the remote pengine.
  207      List is a list of predicate indicators.
  208
  209Remaining  options  are  passed  to  http_open/3  (meaningful  only  for
  210non-local pengines) and thread_create/3. Note   that for thread_create/3
  211only options changing the stack-sizes can be used. In particular, do not
  212pass the detached or alias options..
  213
  214Successful creation of a pengine will return an _event term_ of the
  215following form:
  216
  217    * create(ID, Term)
  218      ID is the id of the pengine that was created.
  219      Term is not used at the moment.
  220
  221An error will be returned if the pengine could not be created:
  222
  223    * error(ID, Term)
  224      ID is invalid, since no pengine was created.
  225      Term is the exception's error term.
  226*/
  227
  228
  229pengine_create(M:Options0) :-
  230    translate_local_sources(Options0, Options, M),
  231    (   select_option(server(BaseURL), Options, RestOptions)
  232    ->  remote_pengine_create(BaseURL, RestOptions)
  233    ;   local_pengine_create(Options)
  234    ).
  235
  236%!  translate_local_sources(+OptionsIn, -Options, +Module) is det.
  237%
  238%   Translate  the  `src_predicates`  and  `src_list`  options  into
  239%   `src_text`. We need to do that   anyway for remote pengines. For
  240%   local pengines, we could avoid  this   step,  but  there is very
  241%   little point in transferring source to a local pengine anyway as
  242%   local pengines can access any  Prolog   predicate  that you make
  243%   visible to the application.
  244%
  245%   Multiple sources are concatenated  to  end   up  with  a  single
  246%   src_text option.
  247
  248translate_local_sources(OptionsIn, Options, Module) :-
  249    translate_local_sources(OptionsIn, Sources, Options2, Module),
  250    (   Sources == []
  251    ->  Options = Options2
  252    ;   Sources = [Source]
  253    ->  Options = [src_text(Source)|Options2]
  254    ;   atomics_to_string(Sources, Source)
  255    ->  Options = [src_text(Source)|Options2]
  256    ).
  257
  258translate_local_sources([], [], [], _).
  259translate_local_sources([H0|T], [S0|S], Options, M) :-
  260    nonvar(H0),
  261    translate_local_source(H0, S0, M),
  262    !,
  263    translate_local_sources(T, S, Options, M).
  264translate_local_sources([H|T0], S, [H|T], M) :-
  265    translate_local_sources(T0, S, T, M).
  266
  267translate_local_source(src_predicates(PIs), Source, M) :-
  268    must_be(list, PIs),
  269    with_output_to(string(Source),
  270                   maplist(list_in_module(M), PIs)).
  271translate_local_source(src_list(Terms), Source, _) :-
  272    must_be(list, Terms),
  273    with_output_to(string(Source),
  274                   forall(member(Term, Terms),
  275                          format('~k .~n', [Term]))).
  276translate_local_source(src_text(Source), Source, _).
  277
  278list_in_module(M, PI) :-
  279    listing(M:PI).
  280
  281/**  pengine_send(+NameOrID, +Term) is det
  282
  283Same as pengine_send(NameOrID, Term, []).
  284*/
  285
  286pengine_send(Target, Event) :-
  287    pengine_send(Target, Event, []).
  288
  289
  290/**  pengine_send(+NameOrID, +Term, +Options) is det
  291
  292Succeeds immediately and  places  Term  in   the  queue  of  the pengine
  293NameOrID. Options is a list of options:
  294
  295   * delay(+Time)
  296     The actual sending is delayed by Time seconds. Time is an integer
  297     or a float.
  298
  299Any remaining options are passed to http_open/3.
  300*/
  301
  302pengine_send(Target, Event, Options) :-
  303    must_be(atom, Target),
  304    pengine_send2(Target, Event, Options).
  305
  306pengine_send2(self, Event, Options) :-
  307    !,
  308    thread_self(Queue),
  309    delay_message(queue(Queue), Event, Options).
  310pengine_send2(Name, Event, Options) :-
  311    child(Name, Target),
  312    !,
  313    delay_message(pengine(Target), Event, Options).
  314pengine_send2(Target, Event, Options) :-
  315    delay_message(pengine(Target), Event, Options).
  316
  317delay_message(Target, Event, Options) :-
  318    option(delay(Delay), Options),
  319    !,
  320    alarm(Delay,
  321          send_message(Target, Event, Options),
  322          _AlarmID,
  323          [remove(true)]).
  324delay_message(Target, Event, Options) :-
  325    random_delay,
  326    send_message(Target, Event, Options).
  327
  328send_message(queue(Queue), Event, _) :-
  329    thread_send_message(Queue, pengine_request(Event)).
  330send_message(pengine(Pengine), Event, Options) :-
  331    (   pengine_remote(Pengine, Server)
  332    ->  remote_pengine_send(Server, Pengine, Event, Options)
  333    ;   pengine_thread(Pengine, Thread)
  334    ->  thread_send_message(Thread, pengine_request(Event))
  335    ;   existence_error(pengine, Pengine)
  336    ).
  337
  338%!  pengine_request(-Request) is det.
  339%
  340%   To be used by a  pengine  to   wait  for  the next request. Such
  341%   messages are placed in the queue by pengine_send/2.
  342
  343pengine_request(Request) :-
  344    pengine_self(Self),
  345    get_pengine_application(Self, Application),
  346    setting(Application:idle_limit, IdleLimit),
  347    thread_self(Me),
  348    (   thread_get_message(Me, pengine_request(Request), [timeout(IdleLimit)])
  349    ->  true
  350    ;   Request = destroy
  351    ).
  352
  353
  354%!  pengine_reply(+Event) is det.
  355%!  pengine_reply(+Queue, +Event) is det.
  356%
  357%   Reply Event to the parent of the   current  Pengine or the given
  358%   Queue.  Such  events  are  read   by    the   other   side  with
  359%   pengine_event/1.
  360%
  361%   If the message cannot be sent within the `idle_limit` setting of
  362%   the pengine, abort the pengine.
  363
  364pengine_reply(Event) :-
  365    pengine_parent(Queue),
  366    pengine_reply(Queue, Event).
  367
  368pengine_reply(_Queue, _Event0) :-
  369    nb_current(pengine_idle_limit_exceeded, true),
  370    !.
  371pengine_reply(Queue, Event0) :-
  372    arg(1, Event0, ID),
  373    wrap_first_answer(ID, Event0, Event),
  374    random_delay,
  375    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
  376    (   pengine_self(ID)
  377    ->  get_pengine_application(ID, Application),
  378        setting(Application:idle_limit, IdleLimit),
  379        debug(pengine(reply), 'Sending ~p, timout: ~q', [Event, IdleLimit]),
  380        (   thread_send_message(Queue, pengine_event(ID, Event),
  381                                [ timeout(IdleLimit)
  382                                ])
  383        ->  true
  384        ;   thread_self(Me),
  385            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
  386                  [ID, Me]),
  387            nb_setval(pengine_idle_limit_exceeded, true),
  388            thread_detach(Me),
  389            abort
  390        )
  391    ;   thread_send_message(Queue, pengine_event(ID, Event))
  392    ).
  393
  394wrap_first_answer(ID, Event0, CreateEvent) :-
  395    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
  396    arg(1, CreateEvent, ID),
  397    !,
  398    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
  399wrap_first_answer(_ID, Event, Event).
  400
  401
  402empty_queue :-
  403    pengine_parent(Queue),
  404    empty_queue(Queue, 0, Discarded),
  405    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
  406
  407empty_queue(Queue, C0, C) :-
  408    thread_get_message(Queue, _Term, [timeout(0)]),
  409    !,
  410    C1 is C0+1,
  411    empty_queue(Queue, C1, C).
  412empty_queue(_, C, C).
  413
  414
  415/** pengine_ask(+NameOrID, @Query, +Options) is det
  416
  417Asks pengine NameOrID a query Query.
  418
  419Options is a list of options:
  420
  421    * template(+Template)
  422      Template is a variable (or a term containing variables) shared
  423      with the query. By default, the template is identical to the
  424      query.
  425
  426    * chunk(+Integer)
  427      Retrieve solutions in chunks of Integer rather than one by one. 1
  428      means no chunking (default). Other integers indicate the maximum
  429      number of solutions to retrieve in one chunk.
  430
  431    * bindings(+Bindings)
  432      Sets the global variable '$variable_names' to a list of
  433      `Name = Var` terms, providing access to the actual variable
  434      names.
  435
  436Any remaining options are passed to pengine_send/3.
  437
  438Note that the predicate pengine_ask/3 is deterministic, even for queries
  439that have more than one solution. Also,  the variables in Query will not
  440be bound. Instead, results will  be  returned   in  the  form  of _event
  441terms_.
  442
  443    * success(ID, Terms, Projection, Time, More)
  444      ID is the id of the pengine that succeeded in solving the query.
  445      Terms is a list holding instantiations of `Template`.  Projection
  446      is a list of variable names that should be displayed. Time is
  447      the CPU time used to produce the results and finally, More
  448      is either `true` or `false`, indicating whether we can expect the
  449      pengine to be able to return more solutions or not, would we call
  450      pengine_next/2.
  451
  452    * failure(ID)
  453      ID is the id of the pengine that failed for lack of a solutions.
  454
  455    * error(ID, Term)
  456      ID is the id of the pengine throwing the exception.
  457      Term is the exception's error term.
  458
  459    * output(ID, Term)
  460      ID is the id of a pengine running the query that called
  461      pengine_output/1. Term is the term that was passed in the first
  462      argument of pengine_output/1 when it was called.
  463
  464    * prompt(ID, Term)
  465      ID is the id of the pengine that called pengine_input/2 and Term is
  466      the prompt.
  467
  468Defined in terms of pengine_send/3, like so:
  469
  470==
  471pengine_ask(ID, Query, Options) :-
  472    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  473    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  474==
  475*/
  476
  477pengine_ask(ID, Query, Options) :-
  478    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  479    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  480
  481
  482pengine_ask_option(template(_)).
  483pengine_ask_option(chunk(_)).
  484pengine_ask_option(bindings(_)).
  485pengine_ask_option(breakpoints(_)).
  486
  487
  488/** pengine_next(+NameOrID, +Options) is det
  489
  490Asks pengine NameOrID for the  next  solution   to  a  query  started by
  491pengine_ask/3. Defined options are:
  492
  493    * chunk(+Count)
  494    Modify the chunk-size to Count before asking the next set of
  495    solutions.
  496
  497Remaining  options  are  passed  to    pengine_send/3.   The  result  of
  498re-executing the current goal is returned  to the caller's message queue
  499in the form of _event terms_.
  500
  501    * success(ID, Terms, Projection, Time, More)
  502      See pengine_ask/3.
  503
  504    * failure(ID)
  505      ID is the id of the pengine that failed for lack of more solutions.
  506
  507    * error(ID, Term)
  508      ID is the id of the pengine throwing the exception.
  509      Term is the exception's error term.
  510
  511    * output(ID, Term)
  512      ID is the id of a pengine running the query that called
  513      pengine_output/1. Term is the term that was passed in the first
  514      argument of pengine_output/1 when it was called.
  515
  516    * prompt(ID, Term)
  517      ID is the id of the pengine that called pengine_input/2 and Term
  518      is the prompt.
  519
  520Defined in terms of pengine_send/3, as follows:
  521
  522==
  523pengine_next(ID, Options) :-
  524    pengine_send(ID, next, Options).
  525==
  526
  527*/
  528
  529pengine_next(ID, Options) :-
  530    select_option(chunk(Count), Options, Options1),
  531    !,
  532    pengine_send(ID, next(Count), Options1).
  533pengine_next(ID, Options) :-
  534    pengine_send(ID, next, Options).
  535
  536
  537/** pengine_stop(+NameOrID, +Options) is det
  538
  539Tells pengine NameOrID to stop looking  for   more  solutions to a query
  540started by pengine_ask/3. Options are passed to pengine_send/3.
  541
  542Defined in terms of pengine_send/3, like so:
  543
  544==
  545pengine_stop(ID, Options) :-
  546    pengine_send(ID, stop, Options).
  547==
  548*/
  549
  550pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
  551
  552
  553/** pengine_abort(+NameOrID) is det
  554
  555Aborts the running query. The pengine goes   back  to state `2', waiting
  556for new queries.
  557
  558@see pengine_destroy/1.
  559*/
  560
  561pengine_abort(Name) :-
  562    (   child(Name, Pengine)
  563    ->  true
  564    ;   Pengine = Name
  565    ),
  566    (   pengine_remote(Pengine, Server)
  567    ->  remote_pengine_abort(Server, Pengine, [])
  568    ;   pengine_thread(Pengine, Thread),
  569        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
  570        catch(thread_signal(Thread, throw(abort_query)), _, true)
  571    ).
  572
  573
  574/** pengine_destroy(+NameOrID) is det.
  575    pengine_destroy(+NameOrID, +Options) is det.
  576
  577Destroys the pengine NameOrID.  With the option force(true), the pengine
  578is killed using abort/0 and pengine_destroy/2 succeeds.
  579*/
  580
  581pengine_destroy(ID) :-
  582    pengine_destroy(ID, []).
  583
  584pengine_destroy(Name, Options) :-
  585    (   child(Name, ID)
  586    ->  true
  587    ;   ID = Name
  588    ),
  589    option(force(true), Options),
  590    !,
  591    (   pengine_thread(ID, Thread)
  592    ->  catch(thread_signal(Thread, abort),
  593              error(existence_error(thread, _), _), true)
  594    ;   true
  595    ).
  596pengine_destroy(ID, _) :-
  597    catch(pengine_send(ID, destroy),
  598          error(existence_error(pengine, ID), _),
  599          retractall(child(_,ID))).
  600
  601
  602/*================= pengines administration =======================
  603*/
  604
  605%!  current_pengine(?Id, ?Parent, ?Location)
  606%
  607%   Dynamic predicate that registers our known pengines.  Id is
  608%   an atomic unique datatype.  Parent is the id of our parent
  609%   pengine.  Location is one of
  610%
  611%     - thread(ThreadId)
  612%     - remote(URL)
  613
  614:- dynamic
  615    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
  616    pengine_queue/4,                % Id, Queue, TimeOut, Time
  617    output_queue/3,                 % Id, Queue, Time
  618    pengine_user/2,                 % Id, User
  619    pengine_data/2.                 % Id, Data
  620:- volatile
  621    current_pengine/6,
  622    pengine_queue/4,
  623    output_queue/3,
  624    pengine_user/2,
  625    pengine_data/2.  626
  627:- thread_local
  628    child/2.                        % ?Name, ?Child
  629
  630%!  pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det.
  631%!  pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det.
  632%!  pengine_unregister(+Id) is det.
  633
  634pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
  635    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
  636
  637pengine_register_remote(Id, URL, Application, Destroy) :-
  638    thread_self(Queue),
  639    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
  640
  641%!  pengine_unregister(+Id)
  642%
  643%   Called by the pengine thread  destruction.   If  we are a remote
  644%   pengine thread, our URL  equals  =http=   and  the  queue is the
  645%   message queue used to send events to the HTTP workers.
  646
  647pengine_unregister(Id) :-
  648    thread_self(Me),
  649    (   current_pengine(Id, Queue, Me, http, _, _)
  650    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
  651    ;   true
  652    ),
  653    retractall(current_pengine(Id, _, Me, _, _, _)),
  654    retractall(pengine_user(Id, _)),
  655    retractall(pengine_data(Id, _)).
  656
  657pengine_unregister_remote(Id) :-
  658    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
  659
  660%!  pengine_self(-Id) is det.
  661%
  662%   True if the current thread is a pengine with Id.
  663
  664pengine_self(Id) :-
  665    thread_self(Thread),
  666    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
  667
  668pengine_parent(Parent) :-
  669    nb_getval(pengine_parent, Parent).
  670
  671pengine_thread(Pengine, Thread) :-
  672    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
  673    Thread \== 0,
  674    !.
  675
  676pengine_remote(Pengine, URL) :-
  677    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
  678
  679get_pengine_application(Pengine, Application) :-
  680    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
  681    !.
  682
  683get_pengine_module(Pengine, Pengine).
  684
  685:- if(current_predicate(uuid/2)).  686pengine_uuid(Id) :-
  687    uuid(Id, [version(4)]).             % Version 4 is random.
  688:- else.  689:- use_module(library(random)).  690pengine_uuid(Id) :-
  691    Max is 1<<128,
  692    random_between(0, Max, Num),
  693    atom_number(Id, Num).
  694:- endif.  695
  696/** pengine_application(+Application) is det.
  697
  698Directive that must be used to declare a pengine application module. The
  699module must not be associated to any   file.  The default application is
  700=pengine_sandbox=.  The  example  below  creates    a   new  application
  701=address_book=  and  imports  the  API  defined    in  the  module  file
  702=adress_book_api.pl= into the application.
  703
  704  ==
  705  :- pengine_application(address_book).
  706  :- use_module(address_book:adress_book_api).
  707  ==
  708*/
  709
  710pengine_application(Application) :-
  711    throw(error(context_error(nodirective,
  712                             pengine_application(Application)), _)).
  713
  714:- multifile
  715    system:term_expansion/2,
  716    current_application/1.  717
  718%!  current_pengine_application(?Application) is nondet.
  719%
  720%   True when Application is a currently defined application.
  721%
  722%   @see pengine_application/1
  723
  724current_pengine_application(Application) :-
  725    current_application(Application).
  726
  727
  728% Default settings for all applications
  729
  730:- setting(thread_pool_size, integer, 100,
  731           'Maximum number of pengines this application can run.').  732:- setting(thread_pool_stacks, list(compound), [],
  733           'Maximum stack sizes for pengines this application can run.').  734:- setting(slave_limit, integer, 3,
  735           'Maximum number of slave pengines a master pengine can create.').  736:- setting(time_limit, number, 300,
  737           'Maximum time to wait for output').  738:- setting(idle_limit, number, 300,
  739           'Pengine auto-destroys when idle for this time').  740:- setting(safe_goal_limit, number, 10,
  741           'Maximum time to try proving safety of the goal').  742:- setting(program_space, integer, 100_000_000,
  743           'Maximum memory used by predicates').  744:- setting(allow_from, list(atom), [*],
  745           'IP addresses from which remotes are allowed to connect').  746:- setting(deny_from, list(atom), [],
  747           'IP addresses from which remotes are NOT allowed to connect').  748:- setting(debug_info, boolean, false,
  749           'Keep information to support source-level debugging').  750
  751
  752system:term_expansion((:- pengine_application(Application)), Expanded) :-
  753    must_be(atom, Application),
  754    (   module_property(Application, file(_))
  755    ->  permission_error(create, pengine_application, Application)
  756    ;   true
  757    ),
  758    expand_term((:- setting(Application:thread_pool_size, integer,
  759                            setting(pengines:thread_pool_size),
  760                            'Maximum number of pengines this \c
  761                            application can run.')),
  762                ThreadPoolSizeSetting),
  763    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
  764                            setting(pengines:thread_pool_stacks),
  765                            'Maximum stack sizes for pengines \c
  766                            this application can run.')),
  767                ThreadPoolStacksSetting),
  768    expand_term((:- setting(Application:slave_limit, integer,
  769                            setting(pengines:slave_limit),
  770                            'Maximum number of local slave pengines \c
  771                            a master pengine can create.')),
  772                SlaveLimitSetting),
  773    expand_term((:- setting(Application:time_limit, number,
  774                            setting(pengines:time_limit),
  775                            'Maximum time to wait for output')),
  776                TimeLimitSetting),
  777    expand_term((:- setting(Application:idle_limit, number,
  778                            setting(pengines:idle_limit),
  779                            'Pengine auto-destroys when idle for this time')),
  780                IdleLimitSetting),
  781    expand_term((:- setting(Application:safe_goal_limit, number,
  782                            setting(pengines:safe_goal_limit),
  783                            'Maximum time to try proving safety of the goal')),
  784                SafeGoalLimitSetting),
  785    expand_term((:- setting(Application:program_space, integer,
  786                            setting(pengines:program_space),
  787                            'Maximum memory used by predicates')),
  788                ProgramSpaceSetting),
  789    expand_term((:- setting(Application:allow_from, list(atom),
  790                            setting(pengines:allow_from),
  791                            'IP addresses from which remotes are allowed \c
  792                            to connect')),
  793                AllowFromSetting),
  794    expand_term((:- setting(Application:deny_from, list(atom),
  795                            setting(pengines:deny_from),
  796                            'IP addresses from which remotes are NOT \c
  797                            allowed to connect')),
  798                DenyFromSetting),
  799    expand_term((:- setting(Application:debug_info, boolean,
  800                            setting(pengines:debug_info),
  801                            'Keep information to support source-level \c
  802                            debugging')),
  803                DebugInfoSetting),
  804    flatten([ pengines:current_application(Application),
  805              ThreadPoolSizeSetting,
  806              ThreadPoolStacksSetting,
  807              SlaveLimitSetting,
  808              TimeLimitSetting,
  809              IdleLimitSetting,
  810              SafeGoalLimitSetting,
  811              ProgramSpaceSetting,
  812              AllowFromSetting,
  813              DenyFromSetting,
  814              DebugInfoSetting
  815            ], Expanded).
  816
  817% Register default application
  818
  819:- pengine_application(pengine_sandbox).  820
  821
  822/** pengine_property(?Pengine, ?Property) is nondet.
  823
  824True when Property is a property of   the  given Pengine. Enumerates all
  825pengines  that  are  known  to  the   calling  Prolog  process.  Defined
  826properties are:
  827
  828  * self(ID)
  829    Identifier of the pengine.  This is the same as the first argument,
  830    and can be used to enumerate all known pengines.
  831  * alias(Name)
  832    Name is the alias name of the pengine, as provided through the
  833    `alias` option when creating the pengine.
  834  * thread(Thread)
  835    If the pengine is a local pengine, Thread is the Prolog thread
  836    identifier of the pengine.
  837  * remote(Server)
  838    If the pengine is remote, the URL of the server.
  839  * application(Application)
  840    Pengine runs the given application
  841  * module(Module)
  842    Temporary module used for running the Pengine.
  843  * destroy(Destroy)
  844    Destroy is =true= if the pengines is destroyed automatically
  845    after completing the query.
  846  * parent(Queue)
  847    Message queue to which the (local) pengine reports.
  848  * source(?SourceID, ?Source)
  849    Source is the source code with the given SourceID. May be present if
  850    the setting `debug_info` is present.
  851*/
  852
  853
  854pengine_property(Id, Prop) :-
  855    nonvar(Id), nonvar(Prop),
  856    pengine_property2(Id, Prop),
  857    !.
  858pengine_property(Id, Prop) :-
  859    pengine_property2(Prop, Id).
  860
  861pengine_property2(self(Id), Id) :-
  862    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  863pengine_property2(module(Id), Id) :-
  864    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  865pengine_property2(alias(Alias), Id) :-
  866    child(Alias, Id),
  867    Alias \== Id.
  868pengine_property2(thread(Thread), Id) :-
  869    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
  870    Thread \== 0.
  871pengine_property2(remote(Server), Id) :-
  872    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
  873pengine_property2(application(Application), Id) :-
  874    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
  875pengine_property2(destroy(Destroy), Id) :-
  876    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
  877pengine_property2(parent(Parent), Id) :-
  878    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
  879pengine_property2(source(SourceID, Source), Id) :-
  880    pengine_data(Id, source(SourceID, Source)).
  881
  882/** pengine_output(+Term) is det
  883
  884Sends Term to the parent pengine or thread.
  885*/
  886
  887pengine_output(Term) :-
  888    pengine_self(Me),
  889    pengine_reply(output(Me, Term)).
  890
  891
  892/** pengine_debug(+Format, +Args) is det
  893
  894Create a message using format/3 from Format   and  Args and send this to
  895the    client.    The    default    JavaScript    client    will    call
  896=|console.log(Message)|=  if  there  is   a    console.   The  predicate
  897pengine_rpc/3 calls debug(pengine(debug), '~w',   [Message]).  The debug
  898topic pengine(debug) is enabled by default.
  899
  900@see debug/1 and nodebug/1 for controlling the pengine(debug) topic
  901@see format/2 for format specifications
  902*/
  903
  904pengine_debug(Format, Args) :-
  905    pengine_parent(Queue),
  906    pengine_self(Self),
  907    catch(safe_goal(format(atom(_), Format, Args)), E, true),
  908    (   var(E)
  909    ->  format(atom(Message), Format, Args)
  910    ;   message_to_string(E, Message)
  911    ),
  912    pengine_reply(Queue, debug(Self, Message)).
  913
  914
  915/*================= Local pengine =======================
  916*/
  917
  918%!  local_pengine_create(+Options)
  919%
  920%   Creates  a  local   Pengine,   which    is   a   thread  running
  921%   pengine_main/2.  It maintains two predicates:
  922%
  923%     - The global dynamic predicate id/2 relates Pengines to their
  924%       childs.
  925%     - The local predicate id/2 maps named childs to their ids.
  926
  927local_pengine_create(Options) :-
  928    thread_self(Self),
  929    option(application(Application), Options, pengine_sandbox),
  930    create(Self, Child, Options, local, Application),
  931    option(alias(Name), Options, Child),
  932    assert(child(Name, Child)).
  933
  934
  935%!  thread_pool:create_pool(+Application) is det.
  936%
  937%   On demand creation of a thread pool for a pengine application.
  938
  939thread_pool:create_pool(Application) :-
  940    current_application(Application),
  941    setting(Application:thread_pool_size, Size),
  942    setting(Application:thread_pool_stacks, Stacks),
  943    thread_pool_create(Application, Size, Stacks).
  944
  945%!  create(+Queue, -Child, +Options, +URL, +Application) is det.
  946%
  947%   Create a new pengine thread.
  948%
  949%   @arg Queue is the queue (or thread handle) to report to
  950%   @arg Child is the identifier of the created pengine.
  951%   @arg URL is one of =local= or =http=
  952
  953create(Queue, Child, Options, local, Application) :-
  954    !,
  955    pengine_child_id(Child),
  956    create0(Queue, Child, Options, local, Application).
  957create(Queue, Child, Options, URL, Application) :-
  958    pengine_child_id(Child),
  959    catch(create0(Queue, Child, Options, URL, Application),
  960          Error,
  961          create_error(Queue, Child, Error)).
  962
  963pengine_child_id(Child) :-
  964    (   nonvar(Child)
  965    ->  true
  966    ;   pengine_uuid(Child)
  967    ).
  968
  969create_error(Queue, Child, Error) :-
  970    pengine_reply(Queue, error(Child, Error)).
  971
  972create0(Queue, Child, Options, URL, Application) :-
  973    (  current_application(Application)
  974    -> true
  975    ;  existence_error(pengine_application, Application)
  976    ),
  977    (   URL \== http                    % pengine is _not_ a child of the
  978                                        % HTTP server thread
  979    ->  aggregate_all(count, child(_,_), Count),
  980        setting(Application:slave_limit, Max),
  981        (   Count >= Max
  982        ->  throw(error(resource_error(max_pengines), _))
  983        ;   true
  984        )
  985    ;   true
  986    ),
  987    partition(pengine_create_option, Options, PengineOptions, RestOptions),
  988    thread_create_in_pool(
  989        Application,
  990        pengine_main(Queue, PengineOptions, Application), ChildThread,
  991        [ at_exit(pengine_done)
  992        | RestOptions
  993        ]),
  994    option(destroy(Destroy), PengineOptions, true),
  995    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
  996    thread_send_message(ChildThread, pengine_registered(Child)),
  997    (   option(id(Id), Options)
  998    ->  Id = Child
  999    ;   true
 1000    ).
 1001
 1002pengine_create_option(src_text(_)).
 1003pengine_create_option(src_url(_)).
 1004pengine_create_option(application(_)).
 1005pengine_create_option(destroy(_)).
 1006pengine_create_option(ask(_)).
 1007pengine_create_option(template(_)).
 1008pengine_create_option(bindings(_)).
 1009pengine_create_option(chunk(_)).
 1010pengine_create_option(alias(_)).
 1011pengine_create_option(user(_)).
 1012
 1013
 1014%!  pengine_done is det.
 1015%
 1016%   Called  from  the  pengine  thread  =at_exit=  option.  Destroys
 1017%   _child_ pengines using pengine_destroy/1.
 1018
 1019:- public
 1020    pengine_done/0. 1021
 1022pengine_done :-
 1023    thread_self(Me),
 1024    (   thread_property(Me, status(exception('$aborted'))),
 1025        thread_detach(Me),
 1026        pengine_self(Pengine)
 1027    ->  catch(pengine_reply(destroy(Pengine, abort(Pengine))),
 1028              error(_,_), true)
 1029    ;   true
 1030    ),
 1031    forall(child(_Name, Child),
 1032           pengine_destroy(Child)),
 1033    pengine_self(Id),
 1034    pengine_unregister(Id).
 1035
 1036
 1037%!  pengine_main(+Parent, +Options, +Application)
 1038%
 1039%   Run a pengine main loop. First acknowledges its creation and run
 1040%   pengine_main_loop/1.
 1041
 1042:- thread_local wrap_first_answer_in_create_event/2. 1043
 1044:- meta_predicate
 1045    pengine_prepare_source(:, +). 1046
 1047pengine_main(Parent, Options, Application) :-
 1048    fix_streams,
 1049    thread_get_message(pengine_registered(Self)),
 1050    nb_setval(pengine_parent, Parent),
 1051    pengine_register_user(Options),
 1052    set_prolog_flag(mitigate_spectre, true),
 1053    catch(in_temporary_module(
 1054              Self,
 1055              pengine_prepare_source(Application, Options),
 1056              pengine_create_and_loop(Self, Application, Options)),
 1057          prepare_source_failed,
 1058          pengine_terminate(Self)).
 1059
 1060pengine_create_and_loop(Self, Application, Options) :-
 1061    setting(Application:slave_limit, SlaveLimit),
 1062    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
 1063    (   option(ask(Query0), Options)
 1064    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
 1065        (   string(Query0)                      % string is not callable
 1066        ->  (   option(template(TemplateS), Options)
 1067            ->  Ask2 = Query0-TemplateS
 1068            ;   Ask2 = Query0
 1069            ),
 1070            catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
 1071                  Error, true),
 1072            (   var(Error)
 1073            ->  true
 1074            ;   send_error(Error),
 1075                throw(prepare_source_failed)
 1076            )
 1077        ;   Query = Query0,
 1078            option(template(Template), Options, Query),
 1079            option(bindings(Bindings), Options, [])
 1080        ),
 1081        option(chunk(Chunk), Options, 1),
 1082        pengine_ask(Self, Query,
 1083                    [ template(Template),
 1084                      chunk(Chunk),
 1085                      bindings(Bindings)
 1086                    ])
 1087    ;   Extra = [],
 1088        pengine_reply(CreateEvent)
 1089    ),
 1090    pengine_main_loop(Self).
 1091
 1092
 1093%!  ask_to_term(+AskSpec, +Module, -Options, OptionsTail) is det.
 1094%
 1095%   Translate the AskSpec into a query, template and bindings. The trick
 1096%   is that we must parse using the  operator declarations of the source
 1097%   and we must make sure  variable   sharing  between  query and answer
 1098%   template are known.
 1099
 1100ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
 1101    !,
 1102    format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
 1103    term_string(t(Template1,Ask1), AskTemplate,
 1104                [ variable_names(Bindings0),
 1105                  module(Module)
 1106                ]),
 1107    phrase(template_bindings(Template1, Bindings0), Bindings).
 1108ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
 1109    term_string(Ask1, Ask,
 1110                [ variable_names(Bindings),
 1111                  module(Module)
 1112                ]),
 1113    exclude(anon, Bindings, Bindings1),
 1114    dict_create(Template, swish_default_template, Bindings1).
 1115
 1116template_bindings(Var, Bindings) -->
 1117    { var(Var) }, !,
 1118    (   { var_binding(Bindings, Var, Binding)
 1119        }
 1120    ->  [Binding]
 1121    ;   []
 1122    ).
 1123template_bindings([H|T], Bindings) -->
 1124    !,
 1125    template_bindings(H, Bindings),
 1126    template_bindings(T, Bindings).
 1127template_bindings(Compoound, Bindings) -->
 1128    { compound(Compoound), !,
 1129      compound_name_arguments(Compoound, _, Args)
 1130    },
 1131    template_bindings(Args, Bindings).
 1132template_bindings(_, _) --> [].
 1133
 1134var_binding(Bindings, Var, Binding) :-
 1135    member(Binding, Bindings),
 1136    arg(2, Binding, V),
 1137    V == Var, !.
 1138
 1139%!  fix_streams is det.
 1140%
 1141%   If we are a pengine that is   created  from a web server thread,
 1142%   the current output points to a CGI stream.
 1143
 1144fix_streams :-
 1145    fix_stream(current_output).
 1146
 1147fix_stream(Name) :-
 1148    is_cgi_stream(Name),
 1149    !,
 1150    debug(pengine(stream), '~w is a CGI stream!', [Name]),
 1151    set_stream(user_output, alias(Name)).
 1152fix_stream(_).
 1153
 1154%!  pengine_prepare_source(:Application, +Options) is det.
 1155%
 1156%   Load the source into the pengine's module.
 1157%
 1158%   @throws =prepare_source_failed= if it failed to prepare the
 1159%           sources.
 1160
 1161pengine_prepare_source(Module:Application, Options) :-
 1162    setting(Application:program_space, SpaceLimit),
 1163    set_module(Module:program_space(SpaceLimit)),
 1164    delete_import_module(Module, user),
 1165    add_import_module(Module, Application, start),
 1166    catch(prep_module(Module, Application, Options), Error, true),
 1167    (   var(Error)
 1168    ->  true
 1169    ;   send_error(Error),
 1170        throw(prepare_source_failed)
 1171    ).
 1172
 1173prep_module(Module, Application, Options) :-
 1174    maplist(copy_flag(Module, Application), [var_prefix]),
 1175    forall(prepare_module(Module, Application, Options), true),
 1176    setup_call_cleanup(
 1177        '$set_source_module'(OldModule, Module),
 1178        maplist(process_create_option(Module), Options),
 1179        '$set_source_module'(OldModule)).
 1180
 1181copy_flag(Module, Application, Flag) :-
 1182    current_prolog_flag(Application:Flag, Value),
 1183    !,
 1184    set_prolog_flag(Module:Flag, Value).
 1185copy_flag(_, _, _).
 1186
 1187process_create_option(Application, src_text(Text)) :-
 1188    !,
 1189    pengine_src_text(Text, Application).
 1190process_create_option(Application, src_url(URL)) :-
 1191    !,
 1192    pengine_src_url(URL, Application).
 1193process_create_option(_, _).
 1194
 1195
 1196%!  prepare_module(+Module, +Application, +Options) is semidet.
 1197%
 1198%   Hook, called to initialize  the   temporary  private module that
 1199%   provides the working context of a pengine. This hook is executed
 1200%   by the pengine's thread.  Preparing the source consists of three
 1201%   steps:
 1202%
 1203%     1. Add Application as (first) default import module for Module
 1204%     2. Call this hook
 1205%     3. Compile the source provided by the the `src_text` and
 1206%        `src_url` options
 1207%
 1208%   @arg    Module is a new temporary module (see
 1209%           in_temporary_module/3) that may be (further) prepared
 1210%           by this hook.
 1211%   @arg    Application (also a module) associated to the pengine.
 1212%   @arg    Options is passed from the environment and should
 1213%           (currently) be ignored.
 1214
 1215
 1216pengine_main_loop(ID) :-
 1217    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
 1218
 1219pengine_aborted(ID) :-
 1220    thread_self(Self),
 1221    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
 1222    empty_queue,
 1223    destroy_or_continue(abort(ID)).
 1224
 1225
 1226%!  guarded_main_loop(+Pengine) is det.
 1227%
 1228%   Executes state `2' of  the  pengine,   where  it  waits  for two
 1229%   events:
 1230%
 1231%     - destroy
 1232%     Terminate the pengine
 1233%     - ask(:Goal, +Options)
 1234%     Solve Goal.
 1235
 1236guarded_main_loop(ID) :-
 1237    pengine_request(Request),
 1238    (   Request = destroy
 1239    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
 1240        pengine_terminate(ID)
 1241    ;   Request = ask(Goal, Options)
 1242    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
 1243        ask(ID, Goal, Options)
 1244    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
 1245        pengine_reply(error(ID, error(protocol_error, _))),
 1246        guarded_main_loop(ID)
 1247    ).
 1248
 1249
 1250pengine_terminate(ID) :-
 1251    pengine_reply(destroy(ID)),
 1252    thread_self(Me),            % Make the thread silently disappear
 1253    thread_detach(Me).
 1254
 1255
 1256%!  solve(+Chunk, +Template, :Goal, +ID) is det.
 1257%
 1258%   Solve Goal. Note that because we can ask for a new goal in state
 1259%   `6', we must provide for an ancesteral cut (prolog_cut_to/1). We
 1260%   need to be sure to  have  a   choice  point  before  we can call
 1261%   prolog_current_choice/1. This is the reason   why this predicate
 1262%   has two clauses.
 1263
 1264solve(Chunk, Template, Goal, ID) :-
 1265    prolog_current_choice(Choice),
 1266    State = count(Chunk),
 1267    statistics(cputime, Epoch),
 1268    Time = time(Epoch),
 1269    nb_current('$variable_names', Bindings),
 1270    filter_template(Template, Bindings, Template2),
 1271    '$current_typein_module'(CurrTypeIn),
 1272    (   '$set_typein_module'(ID),
 1273        call_cleanup(catch(findnsols_no_empty(State, Template2,
 1274                                              set_projection(Goal, Bindings),
 1275                                              Result),
 1276                           Error, true),
 1277                     query_done(Det, CurrTypeIn)),
 1278        arg(1, Time, T0),
 1279        statistics(cputime, T1),
 1280        CPUTime is T1-T0,
 1281        (   var(Error)
 1282        ->  projection(Projection),
 1283            (   var(Det)
 1284            ->  pengine_reply(success(ID, Result, Projection,
 1285                                      CPUTime, true)),
 1286                more_solutions(ID, Choice, State, Time)
 1287            ;   !,                      % commit
 1288                destroy_or_continue(success(ID, Result, Projection,
 1289                                            CPUTime, false))
 1290            )
 1291        ;   !,                          % commit
 1292            (   Error == abort_query
 1293            ->  throw(Error)
 1294            ;   destroy_or_continue(error(ID, Error))
 1295            )
 1296        )
 1297    ;   !,                              % commit
 1298        arg(1, Time, T0),
 1299        statistics(cputime, T1),
 1300        CPUTime is T1-T0,
 1301        destroy_or_continue(failure(ID, CPUTime))
 1302    ).
 1303solve(_, _, _, _).                      % leave a choice point
 1304
 1305query_done(true, CurrTypeIn) :-
 1306    '$set_typein_module'(CurrTypeIn).
 1307
 1308
 1309%!  set_projection(:Goal, +Bindings)
 1310%
 1311%   findnsols/4 copies its goal  and   template  to  avoid instantiation
 1312%   thereof when it stops after finding   N solutions. Using this helper
 1313%   we can a renamed version of Bindings that we can set.
 1314
 1315set_projection(Goal, Bindings) :-
 1316    b_setval('$variable_names', Bindings),
 1317    call(Goal).
 1318
 1319projection(Projection) :-
 1320    nb_current('$variable_names', Bindings),
 1321    !,
 1322    maplist(var_name, Bindings, Projection).
 1323projection([]).
 1324
 1325%!  filter_template(+Template0, +Bindings, -Template) is det.
 1326%
 1327%   Establish the final template. This is   there  because hooks such as
 1328%   goal_expansion/2 and the SWISH query  hooks   can  modify the set of
 1329%   bindings.
 1330%
 1331%   @bug Projection and template handling is pretty messy.
 1332
 1333filter_template(Template0, Bindings, Template) :-
 1334    is_dict(Template0, swish_default_template),
 1335    !,
 1336    dict_create(Template, swish_default_template, Bindings).
 1337filter_template(Template, _Bindings, Template).
 1338
 1339findnsols_no_empty(N, Template, Goal, List) :-
 1340    findnsols(N, Template, Goal, List),
 1341    List \== [].
 1342
 1343destroy_or_continue(Event) :-
 1344    arg(1, Event, ID),
 1345    (   pengine_property(ID, destroy(true))
 1346    ->  thread_self(Me),
 1347        thread_detach(Me),
 1348        pengine_reply(destroy(ID, Event))
 1349    ;   pengine_reply(Event),
 1350        garbage_collect,                % minimise our footprint
 1351        trim_stacks,
 1352        guarded_main_loop(ID)
 1353    ).
 1354
 1355%!  more_solutions(+Pengine, +Choice, +State, +Time)
 1356%
 1357%   Called after a solution was found while  there can be more. This
 1358%   is state `6' of the state machine. It processes these events:
 1359%
 1360%     * stop
 1361%     Go back via state `7' to state `2' (guarded_main_loop/1)
 1362%     * next
 1363%     Fail.  This causes solve/3 to backtrack on the goal asked,
 1364%     providing at most the current `chunk` solutions.
 1365%     * next(Count)
 1366%     As `next`, but sets the new chunk-size to Count.
 1367%     * ask(Goal, Options)
 1368%     Ask another goal.  Note that we must commit the choice point
 1369%     of the previous goal asked for.
 1370
 1371more_solutions(ID, Choice, State, Time) :-
 1372    pengine_request(Event),
 1373    more_solutions(Event, ID, Choice, State, Time).
 1374
 1375more_solutions(stop, ID, _Choice, _State, _Time) :-
 1376    !,
 1377    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
 1378    destroy_or_continue(stop(ID)).
 1379more_solutions(next, ID, _Choice, _State, Time) :-
 1380    !,
 1381    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
 1382    statistics(cputime, T0),
 1383    nb_setarg(1, Time, T0),
 1384    fail.
 1385more_solutions(next(Count), ID, _Choice, State, Time) :-
 1386    Count > 0,
 1387    !,
 1388    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
 1389    nb_setarg(1, State, Count),
 1390    statistics(cputime, T0),
 1391    nb_setarg(1, Time, T0),
 1392    fail.
 1393more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
 1394    !,
 1395    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
 1396    prolog_cut_to(Choice),
 1397    ask(ID, Goal, Options).
 1398more_solutions(destroy, ID, _Choice, _State, _Time) :-
 1399    !,
 1400    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
 1401    pengine_terminate(ID).
 1402more_solutions(Event, ID, Choice, State, Time) :-
 1403    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
 1404    pengine_reply(error(ID, error(protocol_error, _))),
 1405    more_solutions(ID, Choice, State, Time).
 1406
 1407%!  ask(+Pengine, :Goal, +Options)
 1408%
 1409%   Migrate from state `2' to `3'.  This predicate validates that it
 1410%   is safe to call Goal using safe_goal/1 and then calls solve/3 to
 1411%   prove the goal. It takes care of the chunk(N) option.
 1412
 1413ask(ID, Goal, Options) :-
 1414    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
 1415    !,
 1416    (   var(Error)
 1417    ->  option(template(Template), Options, Goal),
 1418        option(chunk(N), Options, 1),
 1419        solve(N, Template, Goal1, ID)
 1420    ;   pengine_reply(error(ID, Error)),
 1421        guarded_main_loop(ID)
 1422    ).
 1423
 1424%!  prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det.
 1425%
 1426%   Prepare GoalIn for execution in Pengine.   This  implies we must
 1427%   perform goal expansion and, if the   system  is sandboxed, check
 1428%   the sandbox.
 1429%
 1430%   Note that expand_goal(Module:GoalIn, GoalOut) is  what we'd like
 1431%   to write, but this does not work correctly if the user wishes to
 1432%   expand `X:Y` while interpreting `X` not   as the module in which
 1433%   to run `Y`. This happens in the  CQL package. Possibly we should
 1434%   disallow this reinterpretation?
 1435
 1436prepare_goal(ID, Goal0, Module:Goal, Options) :-
 1437    option(bindings(Bindings), Options, []),
 1438    b_setval('$variable_names', Bindings),
 1439    (   prepare_goal(Goal0, Goal1, Options)
 1440    ->  true
 1441    ;   Goal1 = Goal0
 1442    ),
 1443    get_pengine_module(ID, Module),
 1444    setup_call_cleanup(
 1445        '$set_source_module'(Old, Module),
 1446        expand_goal(Goal1, Goal),
 1447        '$set_source_module'(_, Old)),
 1448    (   pengine_not_sandboxed(ID)
 1449    ->  true
 1450    ;   get_pengine_application(ID, App),
 1451        setting(App:safe_goal_limit, Limit),
 1452        catch(call_with_time_limit(
 1453                  Limit,
 1454                  safe_goal(Module:Goal)), E, true)
 1455    ->  (   var(E)
 1456        ->  true
 1457        ;   E = time_limit_exceeded
 1458        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
 1459        ;   throw(E)
 1460        )
 1461    ).
 1462
 1463
 1464%%  prepare_goal(+Goal0, -Goal1, +Options) is semidet.
 1465%
 1466%   Pre-preparation hook for running Goal0. The hook runs in the context
 1467%   of the pengine. Goal is the raw   goal  given to _ask_. The returned
 1468%   Goal1 is subject  to  goal   expansion  (expand_goal/2)  and sandbox
 1469%   validation (safe_goal/1) prior to  execution.   If  this goal fails,
 1470%   Goal0 is used for further processing.
 1471%
 1472%   @arg Options provides the options as given to _ask_
 1473
 1474
 1475%%  pengine_not_sandboxed(+Pengine) is semidet.
 1476%
 1477%   True when pengine does not operate in sandboxed mode. This implies a
 1478%   user must be  registered  by   authentication_hook/3  and  the  hook
 1479%   pengines:not_sandboxed(User, Application) must succeed.
 1480
 1481pengine_not_sandboxed(ID) :-
 1482    pengine_user(ID, User),
 1483    pengine_property(ID, application(App)),
 1484    not_sandboxed(User, App),
 1485    !.
 1486
 1487%%  not_sandboxed(+User, +Application) is semidet.
 1488%
 1489%   This hook is called to see whether the Pengine must be executed in a
 1490%   protected environment. It is only called after authentication_hook/3
 1491%   has confirmed the authentity  of  the   current  user.  If this hook
 1492%   succeeds, both loading the code and  executing the query is executed
 1493%   without enforcing sandbox security.  Typically, one should:
 1494%
 1495%     1. Provide a safe user authentication hook.
 1496%     2. Enable HTTPS in the server or put it behind an HTTPS proxy and
 1497%        ensure that the network between the proxy and the pengine
 1498%        server can be trusted.
 1499
 1500
 1501/** pengine_pull_response(+Pengine, +Options) is det
 1502
 1503Pulls a response (an event term) from the  slave Pengine if Pengine is a
 1504remote process, else does nothing at all.
 1505*/
 1506
 1507pengine_pull_response(Pengine, Options) :-
 1508    pengine_remote(Pengine, Server),
 1509    !,
 1510    remote_pengine_pull_response(Server, Pengine, Options).
 1511pengine_pull_response(_ID, _Options).
 1512
 1513
 1514/** pengine_input(+Prompt, -Term) is det
 1515
 1516Sends Prompt to the master (parent) pengine and waits for input. Note that Prompt may be
 1517any term, compound as well as atomic.
 1518*/
 1519
 1520pengine_input(Prompt, Term) :-
 1521    pengine_self(Self),
 1522    pengine_parent(Parent),
 1523    pengine_reply(Parent, prompt(Self, Prompt)),
 1524    pengine_request(Request),
 1525    (   Request = input(Input)
 1526    ->  Term = Input
 1527    ;   Request == destroy
 1528    ->  abort
 1529    ;   throw(error(protocol_error,_))
 1530    ).
 1531
 1532
 1533/** pengine_respond(+Pengine, +Input, +Options) is det
 1534
 1535Sends a response in the form of the term Input to a slave (child) pengine
 1536that has prompted its master (parent) for input.
 1537
 1538Defined in terms of pengine_send/3, as follows:
 1539
 1540==
 1541pengine_respond(Pengine, Input, Options) :-
 1542    pengine_send(Pengine, input(Input), Options).
 1543==
 1544
 1545*/
 1546
 1547pengine_respond(Pengine, Input, Options) :-
 1548    pengine_send(Pengine, input(Input), Options).
 1549
 1550
 1551%!  send_error(+Error) is det.
 1552%
 1553%   Send an error to my parent.   Remove non-readable blobs from the
 1554%   error term first using replace_blobs/2. If  the error contains a
 1555%   stack-trace, this is resolved to a string before sending.
 1556
 1557send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
 1558    is_list(Frames),
 1559    !,
 1560    with_output_to(string(Stack),
 1561                   print_prolog_backtrace(current_output, Frames)),
 1562    pengine_self(Self),
 1563    replace_blobs(Formal, Formal1),
 1564    replace_blobs(Message, Message1),
 1565    pengine_reply(error(Self, error(Formal1,
 1566                                    context(prolog_stack(Stack), Message1)))).
 1567send_error(Error) :-
 1568    pengine_self(Self),
 1569    replace_blobs(Error, Error1),
 1570    pengine_reply(error(Self, Error1)).
 1571
 1572%!  replace_blobs(Term0, Term) is det.
 1573%
 1574%   Copy Term0 to Term, replacing non-text   blobs. This is required
 1575%   for error messages that may hold   streams  and other handles to
 1576%   non-readable objects.
 1577
 1578replace_blobs(Blob, Atom) :-
 1579    blob(Blob, Type), Type \== text,
 1580    !,
 1581    format(atom(Atom), '~p', [Blob]).
 1582replace_blobs(Term0, Term) :-
 1583    compound(Term0),
 1584    !,
 1585    compound_name_arguments(Term0, Name, Args0),
 1586    maplist(replace_blobs, Args0, Args),
 1587    compound_name_arguments(Term, Name, Args).
 1588replace_blobs(Term, Term).
 1589
 1590
 1591/*================= Remote pengines =======================
 1592*/
 1593
 1594
 1595remote_pengine_create(BaseURL, Options) :-
 1596    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
 1597        (       option(ask(Query), PengineOptions0),
 1598                \+ option(template(_Template), PengineOptions0)
 1599        ->      PengineOptions = [template(Query)|PengineOptions0]
 1600        ;       PengineOptions = PengineOptions0
 1601        ),
 1602    options_to_dict(PengineOptions, PostData),
 1603    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
 1604    arg(1, Reply, ID),
 1605    (   option(id(ID2), Options)
 1606    ->  ID = ID2
 1607    ;   true
 1608    ),
 1609    option(alias(Name), Options, ID),
 1610    assert(child(Name, ID)),
 1611    (   (   functor(Reply, create, _)   % actually created
 1612        ;   functor(Reply, output, _)   % compiler messages
 1613        )
 1614    ->  option(application(Application), PengineOptions, pengine_sandbox),
 1615        option(destroy(Destroy), PengineOptions, true),
 1616        pengine_register_remote(ID, BaseURL, Application, Destroy)
 1617    ;   true
 1618    ),
 1619    thread_self(Queue),
 1620    pengine_reply(Queue, Reply).
 1621
 1622options_to_dict(Options, Dict) :-
 1623    select_option(ask(Ask), Options, Options1),
 1624    select_option(template(Template), Options1, Options2),
 1625    !,
 1626    no_numbered_var_in(Ask+Template),
 1627    findall(AskString-TemplateString,
 1628            ask_template_to_strings(Ask, Template, AskString, TemplateString),
 1629            [ AskString-TemplateString ]),
 1630    options_to_dict(Options2, Dict0),
 1631    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
 1632options_to_dict(Options, Dict) :-
 1633    maplist(prolog_option, Options, Options1),
 1634    dict_create(Dict, _, Options1).
 1635
 1636no_numbered_var_in(Term) :-
 1637    sub_term(Sub, Term),
 1638    subsumes_term('$VAR'(_), Sub),
 1639    !,
 1640    domain_error(numbered_vars_free_term, Term).
 1641no_numbered_var_in(_).
 1642
 1643ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
 1644    numbervars(Ask+Template, 0, _),
 1645    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
 1646    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
 1647                                            Template, WOpts
 1648                                          ]),
 1649    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
 1650
 1651prolog_option(Option0, Option) :-
 1652    create_option_type(Option0, term),
 1653    !,
 1654    Option0 =.. [Name,Value],
 1655    format(string(String), '~k', [Value]),
 1656    Option =.. [Name,String].
 1657prolog_option(Option, Option).
 1658
 1659create_option_type(ask(_),         term).
 1660create_option_type(template(_),    term).
 1661create_option_type(application(_), atom).
 1662
 1663remote_pengine_send(BaseURL, ID, Event, Options) :-
 1664    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
 1665    thread_self(Queue),
 1666    pengine_reply(Queue, Reply).
 1667
 1668remote_pengine_pull_response(BaseURL, ID, Options) :-
 1669    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
 1670    thread_self(Queue),
 1671    pengine_reply(Queue, Reply).
 1672
 1673remote_pengine_abort(BaseURL, ID, Options) :-
 1674    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
 1675    thread_self(Queue),
 1676    pengine_reply(Queue, Reply).
 1677
 1678%!  remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
 1679%
 1680%   Issue a GET request on Server and   unify Reply with the replied
 1681%   term.
 1682
 1683remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
 1684    !,
 1685    server_url(Server, Action, [id=ID], URL),
 1686    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
 1687              [ post(prolog(Event))     % makes it impossible to interrupt.
 1688              | Options
 1689              ]),
 1690    call_cleanup(
 1691        read_prolog_reply(Stream, Reply),
 1692        close(Stream)).
 1693remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
 1694    server_url(Server, Action, [id=ID|Params], URL),
 1695    http_open(URL, Stream, Options),
 1696    call_cleanup(
 1697        read_prolog_reply(Stream, Reply),
 1698        close(Stream)).
 1699
 1700remote_post_rec(Server, Action, Data, Reply, Options) :-
 1701    server_url(Server, Action, [], URL),
 1702    probe(Action, URL),
 1703    http_open(URL, Stream,
 1704              [ post(json(Data))
 1705              | Options
 1706              ]),
 1707    call_cleanup(
 1708        read_prolog_reply(Stream, Reply),
 1709        close(Stream)).
 1710
 1711%!  probe(+Action, +URL) is det.
 1712%
 1713%   Probe the target. This is a  good   idea  before posting a large
 1714%   document and be faced with an authentication challenge. Possibly
 1715%   we should make this an option for simpler scenarios.
 1716
 1717probe(create, URL) :-
 1718    !,
 1719    http_open(URL, Stream, [method(options)]),
 1720    close(Stream).
 1721probe(_, _).
 1722
 1723read_prolog_reply(In, Reply) :-
 1724    set_stream(In, encoding(utf8)),
 1725    read(In, Reply0),
 1726    rebind_cycles(Reply0, Reply).
 1727
 1728rebind_cycles(@(Reply, Bindings), Reply) :-
 1729    is_list(Bindings),
 1730    !,
 1731    maplist(bind, Bindings).
 1732rebind_cycles(Reply, Reply).
 1733
 1734bind(Var = Value) :-
 1735    Var = Value.
 1736
 1737server_url(Server, Action, Params, URL) :-
 1738    uri_components(Server, Components0),
 1739    uri_query_components(Query, Params),
 1740    uri_data(path, Components0, Path0),
 1741    atom_concat('pengine/', Action, PAction),
 1742    directory_file_path(Path0, PAction, Path),
 1743    uri_data(path, Components0, Path, Components),
 1744    uri_data(search, Components, Query),
 1745    uri_components(URL, Components).
 1746
 1747
 1748/** pengine_event(?EventTerm) is det.
 1749    pengine_event(?EventTerm, +Options) is det.
 1750
 1751Examines the pengine's event queue  and   if  necessary blocks execution
 1752until a term that unifies to Term  arrives   in  the queue. After a term
 1753from the queue has been unified to Term,   the  term is deleted from the
 1754queue.
 1755
 1756   Valid options are:
 1757
 1758   * timeout(+Time)
 1759     Time is a float or integer and specifies the maximum time to wait
 1760     in seconds. If no event has arrived before the time is up EventTerm
 1761     is bound to the atom =timeout=.
 1762   * listen(+Id)
 1763     Only listen to events from the pengine identified by Id.
 1764*/
 1765
 1766pengine_event(Event) :-
 1767    pengine_event(Event, []).
 1768
 1769pengine_event(Event, Options) :-
 1770    thread_self(Self),
 1771    option(listen(Id), Options, _),
 1772    (   thread_get_message(Self, pengine_event(Id, Event), Options)
 1773    ->  true
 1774    ;   Event = timeout
 1775    ),
 1776    update_remote_destroy(Event).
 1777
 1778update_remote_destroy(Event) :-
 1779    destroy_event(Event),
 1780    arg(1, Event, Id),
 1781    pengine_remote(Id, _Server),
 1782    !,
 1783    pengine_unregister_remote(Id).
 1784update_remote_destroy(_).
 1785
 1786destroy_event(destroy(_)).
 1787destroy_event(destroy(_,_)).
 1788destroy_event(create(_,Features)) :-
 1789    memberchk(answer(Answer), Features),
 1790    !,
 1791    nonvar(Answer),
 1792    destroy_event(Answer).
 1793
 1794
 1795/** pengine_event_loop(:Closure, +Options) is det
 1796
 1797Starts an event loop accepting event terms   sent to the current pengine
 1798or thread. For each such  event   E,  calls  ignore(call(Closure, E)). A
 1799closure thus acts as a _handler_  for   the  event. Some events are also
 1800treated specially:
 1801
 1802   * create(ID, Term)
 1803     The ID is placed in a list of active pengines.
 1804
 1805   * destroy(ID)
 1806     The ID is removed from the list of active pengines. When the last
 1807     pengine ID is removed, the loop terminates.
 1808
 1809   * output(ID, Term)
 1810     The predicate pengine_pull_response/2 is called.
 1811
 1812Valid options are:
 1813
 1814   * autoforward(+To)
 1815     Forwards received event terms to slaves. To is either =all=,
 1816     =all_but_sender= or a Prolog list of NameOrIDs. [not yet
 1817     implemented]
 1818
 1819*/
 1820
 1821pengine_event_loop(Closure, Options) :-
 1822    child(_,_),
 1823    !,
 1824    pengine_event(Event),
 1825    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
 1826    ->  forall(child(_,ID), pengine_send(ID, Event))
 1827    ;   true
 1828    ),
 1829    pengine_event_loop(Event, Closure, Options).
 1830pengine_event_loop(_, _).
 1831
 1832:- meta_predicate
 1833    pengine_process_event(+, 1, -, +). 1834
 1835pengine_event_loop(Event, Closure, Options) :-
 1836    pengine_process_event(Event, Closure, Continue, Options),
 1837    (   Continue == true
 1838    ->  pengine_event_loop(Closure, Options)
 1839    ;   true
 1840    ).
 1841
 1842pengine_process_event(create(ID, T), Closure, Continue, Options) :-
 1843    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
 1844    (   select(answer(First), T, T1)
 1845    ->  ignore(call(Closure, create(ID, T1))),
 1846        pengine_process_event(First, Closure, Continue, Options)
 1847    ;   ignore(call(Closure, create(ID, T))),
 1848        Continue = true
 1849    ).
 1850pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
 1851    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
 1852    ignore(call(Closure, output(ID, Msg))),
 1853    pengine_pull_response(ID, []).
 1854pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
 1855    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
 1856    ignore(call(Closure, debug(ID, Msg))),
 1857    pengine_pull_response(ID, []).
 1858pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
 1859    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
 1860    ignore(call(Closure, prompt(ID, Term))).
 1861pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
 1862    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
 1863    ignore(call(Closure, success(ID, Sol, More))).
 1864pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
 1865    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
 1866    ignore(call(Closure, failure(ID))).
 1867pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
 1868    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
 1869    (   call(Closure, error(ID, Error))
 1870    ->  Continue = true
 1871    ;   forall(child(_,Child), pengine_destroy(Child)),
 1872        throw(Error)
 1873    ).
 1874pengine_process_event(stop(ID), Closure, true, _Options) :-
 1875    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
 1876    ignore(call(Closure, stop(ID))).
 1877pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
 1878    pengine_process_event(Event, Closure, _, Options),
 1879    pengine_process_event(destroy(ID), Closure, Continue, Options).
 1880pengine_process_event(destroy(ID), Closure, true, _Options) :-
 1881    retractall(child(_,ID)),
 1882    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
 1883    ignore(call(Closure, destroy(ID))).
 1884
 1885
 1886/** pengine_rpc(+URL, +Query) is nondet.
 1887    pengine_rpc(+URL, +Query, +Options) is nondet.
 1888
 1889Semantically equivalent to the sequence below,  except that the query is
 1890executed in (and in the Prolog context   of) the pengine server referred
 1891to by URL, rather than locally.
 1892
 1893  ==
 1894    copy_term_nat(Query, Copy),  % attributes are not copied to the server
 1895    call(Copy),			 % executed on server at URL
 1896    Query = Copy.
 1897  ==
 1898
 1899Valid options are:
 1900
 1901    - chunk(+Integer)
 1902      Can be used to reduce the number of network roundtrips being made.
 1903      See pengine_ask/3.
 1904    - timeout(+Time)
 1905      Wait at most Time seconds for the next event from the server.
 1906      The default is defined by the setting `pengines:time_limit`.
 1907
 1908Remaining  options  (except   the   server    option)   are   passed  to
 1909pengine_create/1.
 1910*/
 1911
 1912pengine_rpc(URL, Query) :-
 1913    pengine_rpc(URL, Query, []).
 1914
 1915pengine_rpc(URL, Query, M:Options0) :-
 1916    translate_local_sources(Options0, Options1, M),
 1917    (  option(timeout(_), Options1)
 1918    -> Options = Options1
 1919    ;  setting(time_limit, Limit),
 1920       Options = [timeout(Limit)|Options1]
 1921    ),
 1922    term_variables(Query, Vars),
 1923    Template =.. [v|Vars],
 1924    State = destroy(true),              % modified by process_event/4
 1925    setup_call_catcher_cleanup(
 1926        pengine_create([ ask(Query),
 1927                         template(Template),
 1928                         server(URL),
 1929                         id(Id)
 1930                       | Options
 1931                       ]),
 1932        wait_event(Template, State, [listen(Id)|Options]),
 1933        Why,
 1934        pengine_destroy_and_wait(State, Id, Why)).
 1935
 1936pengine_destroy_and_wait(destroy(true), Id, Why) :-
 1937    !,
 1938    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
 1939    pengine_destroy(Id),
 1940    wait_destroy(Id, 10).
 1941pengine_destroy_and_wait(_, _, Why) :-
 1942    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
 1943
 1944wait_destroy(Id, _) :-
 1945    \+ child(_, Id),
 1946    !.
 1947wait_destroy(Id, N) :-
 1948    pengine_event(Event, [listen(Id),timeout(10)]),
 1949    !,
 1950    (   destroy_event(Event)
 1951    ->  retractall(child(_,Id))
 1952    ;   succ(N1, N)
 1953    ->  wait_destroy(Id, N1)
 1954    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
 1955        pengine_unregister_remote(Id),
 1956        retractall(child(_,Id))
 1957    ).
 1958
 1959wait_event(Template, State, Options) :-
 1960    pengine_event(Event, Options),
 1961    debug(pengine(event), 'Received ~p', [Event]),
 1962    process_event(Event, Template, State, Options).
 1963
 1964process_event(create(_ID, Features), Template, State, Options) :-
 1965    memberchk(answer(First), Features),
 1966    process_event(First, Template, State, Options).
 1967process_event(error(_ID, Error), _Template, _, _Options) :-
 1968    throw(Error).
 1969process_event(failure(_ID, _Time), _Template, _, _Options) :-
 1970    fail.
 1971process_event(prompt(ID, Prompt), Template, State, Options) :-
 1972    pengine_rpc_prompt(ID, Prompt, Reply),
 1973    pengine_send(ID, input(Reply)),
 1974    wait_event(Template, State, Options).
 1975process_event(output(ID, Term), Template, State, Options) :-
 1976    pengine_rpc_output(ID, Term),
 1977    pengine_pull_response(ID, Options),
 1978    wait_event(Template, State, Options).
 1979process_event(debug(ID, Message), Template, State, Options) :-
 1980    debug(pengine(debug), '~w', [Message]),
 1981    pengine_pull_response(ID, Options),
 1982    wait_event(Template, State, Options).
 1983process_event(success(_ID, Solutions, _Proj, _Time, false),
 1984              Template, _, _Options) :-
 1985    !,
 1986    member(Template, Solutions).
 1987process_event(success(ID, Solutions, _Proj, _Time, true),
 1988              Template, State, Options) :-
 1989    (   member(Template, Solutions)
 1990    ;   pengine_next(ID, Options),
 1991        wait_event(Template, State, Options)
 1992    ).
 1993process_event(destroy(ID, Event), Template, State, Options) :-
 1994    !,
 1995    retractall(child(_,ID)),
 1996    nb_setarg(1, State, false),
 1997    debug(pengine(destroy), 'State: ~p~n', [State]),
 1998    process_event(Event, Template, State, Options).
 1999% compatibility with older versions of the protocol.
 2000process_event(success(ID, Solutions, Time, More),
 2001              Template, State, Options) :-
 2002    process_event(success(ID, Solutions, _Proj, Time, More),
 2003                  Template, State, Options).
 2004
 2005
 2006pengine_rpc_prompt(ID, Prompt, Term) :-
 2007    prompt(ID, Prompt, Term0),
 2008    !,
 2009    Term = Term0.
 2010pengine_rpc_prompt(_ID, Prompt, Term) :-
 2011    setup_call_cleanup(
 2012        prompt(Old, Prompt),
 2013        read(Term),
 2014        prompt(_, Old)).
 2015
 2016pengine_rpc_output(ID, Term) :-
 2017    output(ID, Term),
 2018    !.
 2019pengine_rpc_output(_ID, Term) :-
 2020    print(Term).
 2021
 2022%%  prompt(+ID, +Prompt, -Term) is semidet.
 2023%
 2024%   Hook to handle pengine_input/2 from the remote pengine. If the hooks
 2025%   fails, pengine_rpc/3 calls read/1 using the current prompt.
 2026
 2027:- multifile prompt/3. 2028
 2029%%  output(+ID, +Term) is semidet.
 2030%
 2031%   Hook to handle pengine_output/1 from the remote pengine. If the hook
 2032%   fails, it calls print/1 on Term.
 2033
 2034:- multifile output/2. 2035
 2036
 2037/*================= HTTP handlers =======================
 2038*/
 2039
 2040%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
 2041%   time_limit(inifinite) because pengines have their  own timeout. Also
 2042%   note that we use spawn. This  is   needed  because we can easily get
 2043%   many clients waiting for  some  action   on  a  pengine to complete.
 2044%   Without spawning, we would quickly exhaust   the  worker pool of the
 2045%   HTTP server.
 2046%
 2047%   FIXME: probably we should wait for a   short time for the pengine on
 2048%   the default worker thread. Only if  that   time  has expired, we can
 2049%   call http_spawn/2 to continue waiting on   a  new thread. That would
 2050%   improve the performance and reduce the usage of threads.
 2051
 2052:- http_handler(root(pengine),               http_404([]),
 2053                [ id(pengines) ]). 2054:- http_handler(root(pengine/create),        http_pengine_create,
 2055                [ time_limit(infinite), spawn([]) ]). 2056:- http_handler(root(pengine/send),          http_pengine_send,
 2057                [ time_limit(infinite), spawn([]) ]). 2058:- http_handler(root(pengine/pull_response), http_pengine_pull_response,
 2059                [ time_limit(infinite), spawn([]) ]). 2060:- http_handler(root(pengine/abort),         http_pengine_abort,         []). 2061:- http_handler(root(pengine/ping),          http_pengine_ping,          []). 2062:- http_handler(root(pengine/destroy_all),   http_pengine_destroy_all,   []). 2063
 2064:- http_handler(root(pengine/'pengines.js'),
 2065                http_reply_file(library('http/web/js/pengines.js'), []), []). 2066:- http_handler(root(pengine/'plterm.css'),
 2067                http_reply_file(library('http/web/css/plterm.css'), []), []). 2068
 2069
 2070%%  http_pengine_create(+Request)
 2071%
 2072%   HTTP POST handler  for  =/pengine/create=.   This  API  accepts  the
 2073%   pengine  creation  parameters  both  as  =application/json=  and  as
 2074%   =www-form-encoded=.  Accepted parameters:
 2075%
 2076%     | **Parameter** | **Default**       | **Comment**                |
 2077%     |---------------|-------------------|----------------------------|
 2078%     | format        | `prolog`          | Output format              |
 2079%     | application   | `pengine_sandbox` | Pengine application        |
 2080%     | chunk         | 1                 | Chunk-size for results     |
 2081%     | solutions     | chunked           | If `all`, emit all results |
 2082%     | ask           | -                 | The query                  |
 2083%     | template      | -                 | Output template            |
 2084%     | src_text      | ""                | Program                    |
 2085%     | src_url       | -                 | Program to download        |
 2086%     | disposition   | -                 | Download location          |
 2087%
 2088%     Note that solutions=all internally  uses   chunking  to obtain the
 2089%     results from the pengine, but the results are combined in a single
 2090%     HTTP reply. This is currently only  implemented by the CSV backend
 2091%     that is part of SWISH for   downloading unbounded result sets with
 2092%     limited memory resources.
 2093
 2094http_pengine_create(Request) :-
 2095    reply_options(Request, [post]),
 2096    !.
 2097http_pengine_create(Request) :-
 2098    memberchk(content_type(CT), Request),
 2099    sub_atom(CT, 0, _, _, 'application/json'),
 2100    !,
 2101    http_read_json_dict(Request, Dict),
 2102    dict_atom_option(format, Dict, Format, prolog),
 2103    dict_atom_option(application, Dict, Application, pengine_sandbox),
 2104    http_pengine_create(Request, Application, Format, Dict).
 2105http_pengine_create(Request) :-
 2106    Optional = [optional(true)],
 2107    OptString = [string|Optional],
 2108    Form = [ format(Format, [default(prolog)]),
 2109             application(Application, [default(pengine_sandbox)]),
 2110             chunk(_, [integer, default(1)]),
 2111             solutions(_, [oneof([all,chunked]), default(chunked)]),
 2112             ask(_, OptString),
 2113             template(_, OptString),
 2114             src_text(_, OptString),
 2115             disposition(_, OptString),
 2116             src_url(_, Optional)
 2117           ],
 2118    http_parameters(Request, Form),
 2119    form_dict(Form, Dict),
 2120    http_pengine_create(Request, Application, Format, Dict).
 2121
 2122dict_atom_option(Key, Dict, Atom, Default) :-
 2123    (   get_dict(Key, Dict, String)
 2124    ->  atom_string(Atom, String)
 2125    ;   Atom = Default
 2126    ).
 2127
 2128form_dict(Form, Dict) :-
 2129    form_values(Form, Pairs),
 2130    dict_pairs(Dict, _, Pairs).
 2131
 2132form_values([], []).
 2133form_values([H|T], Pairs) :-
 2134    arg(1, H, Value),
 2135    nonvar(Value),
 2136    !,
 2137    functor(H, Name, _),
 2138    Pairs = [Name-Value|PairsT],
 2139    form_values(T, PairsT).
 2140form_values([_|T], Pairs) :-
 2141    form_values(T, Pairs).
 2142
 2143%!  http_pengine_create(+Request, +Application, +Format, +OptionsDict)
 2144
 2145
 2146http_pengine_create(Request, Application, Format, Dict) :-
 2147    current_application(Application),
 2148    !,
 2149    allowed(Request, Application),
 2150    authenticate(Request, Application, UserOptions),
 2151    dict_to_options(Dict, Application, CreateOptions0),
 2152    append(UserOptions, CreateOptions0, CreateOptions),
 2153    pengine_uuid(Pengine),
 2154    message_queue_create(Queue, [max_size(25)]),
 2155    setting(Application:time_limit, TimeLimit),
 2156    get_time(Now),
 2157    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
 2158    broadcast(pengine(create(Pengine, Application, CreateOptions))),
 2159    create(Queue, Pengine, CreateOptions, http, Application),
 2160    create_wait_and_output_result(Pengine, Queue, Format,
 2161                                  TimeLimit, Dict),
 2162    gc_abandoned_queues.
 2163http_pengine_create(_Request, Application, Format, _Dict) :-
 2164    Error = existence_error(pengine_application, Application),
 2165    pengine_uuid(ID),
 2166    output_result(Format, error(ID, error(Error, _))).
 2167
 2168
 2169dict_to_options(Dict, Application, CreateOptions) :-
 2170    dict_pairs(Dict, _, Pairs),
 2171    pairs_create_options(Pairs, Application, CreateOptions).
 2172
 2173pairs_create_options([], _, []) :- !.
 2174pairs_create_options([N-V0|T0], App, [Opt|T]) :-
 2175    Opt =.. [N,V],
 2176    pengine_create_option(Opt), N \== user,
 2177    !,
 2178    (   create_option_type(Opt, atom)
 2179    ->  atom_string(V, V0)               % term creation must be done if
 2180    ;   V = V0                           % we created the source and know
 2181    ),                                   % the operators.
 2182    pairs_create_options(T0, App, T).
 2183pairs_create_options([_|T0], App, T) :-
 2184    pairs_create_options(T0, App, T).
 2185
 2186%!  wait_and_output_result(+Pengine, +Queue,
 2187%!                         +Format, +TimeLimit) is det.
 2188%
 2189%   Wait for the Pengine's Queue and if  there is a message, send it
 2190%   to the requester using  output_result/1.   If  Pengine  does not
 2191%   answer within the time specified   by  the setting =time_limit=,
 2192%   Pengine is aborted and the  result is error(time_limit_exceeded,
 2193%   _).
 2194
 2195wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
 2196    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2197                                 [ timeout(TimeLimit)
 2198                                 ]),
 2199              Error, true)
 2200    ->  (   var(Error)
 2201        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2202            ignore(destroy_queue_from_http(Pengine, Event, Queue)),
 2203            output_result(Format, Event)
 2204        ;   output_result(Format, died(Pengine))
 2205        )
 2206    ;   time_limit_exceeded(Pengine, Format)
 2207    ).
 2208
 2209%!  create_wait_and_output_result(+Pengine, +Queue, +Format,
 2210%!                                +TimeLimit, +Dict) is det.
 2211%
 2212%   Intercepts  the  `solutions=all'  case    used  for  downloading
 2213%   results. Dict may contain a  `disposition`   key  to  denote the
 2214%   download location.
 2215
 2216create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2217    get_dict(solutions, Dict, all),
 2218    !,
 2219    between(1, infinite, Page),
 2220    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2221                                 [ timeout(TimeLimit)
 2222                                 ]),
 2223              Error, true)
 2224    ->  (   var(Error)
 2225        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
 2226            (   destroy_queue_from_http(Pengine, Event, Queue)
 2227            ->  !, output_result(Format, page(Page, Event))
 2228            ;   is_more_event(Event)
 2229            ->  pengine_thread(Pengine, Thread),
 2230                thread_send_message(Thread, pengine_request(next)),
 2231                output_result(Format, page(Page, Event), Dict),
 2232                fail
 2233            ;   !, output_result(Format, page(Page, Event), Dict)
 2234            )
 2235        ;   !, output_result(Format, died(Pengine))
 2236        )
 2237    ;   !, time_limit_exceeded(Pengine, Format)
 2238    ),
 2239    !.
 2240create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, _Dict) :-
 2241    wait_and_output_result(Pengine, Queue, Format, TimeLimit).
 2242
 2243is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
 2244is_more_event(create(_, Options)) :-
 2245    memberchk(answer(Event), Options),
 2246    is_more_event(Event).
 2247
 2248
 2249
 2250%!  time_limit_exceeded(+Pengine, +Format)
 2251%
 2252%   The Pengine did not reply within its time limit. Send a reply to the
 2253%   client in the requested format and interrupt the Pengine.
 2254%
 2255%   @bug Ideally, if the Pengine has `destroy` set to `false`, we should
 2256%   get the Pengine back to its main   loop.  Unfortunately we only have
 2257%   normal exceptions that may be  caught   by  the  Pengine and `abort`
 2258%   which cannot be caught and thus destroys the Pengine.
 2259
 2260time_limit_exceeded(Pengine, Format) :-
 2261    call_cleanup(
 2262        pengine_destroy(Pengine, [force(true)]),
 2263        output_result(Format,
 2264                      destroy(Pengine,
 2265                              error(Pengine, time_limit_exceeded)))).
 2266
 2267
 2268%!  destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet.
 2269%
 2270%   Consider destroying the output queue   for Pengine after sending
 2271%   Event back to the HTTP client. We can destroy the queue if
 2272%
 2273%     - The pengine already died (output_queue/3 is present) and
 2274%       the queue is empty.
 2275%     - This is a final (destroy) event.
 2276%
 2277%   @tbd    If the client did not request all output, the queue will
 2278%           not be destroyed.  We need some timeout and GC for that.
 2279
 2280destroy_queue_from_http(ID, _, Queue) :-
 2281    output_queue(ID, Queue, _),
 2282    !,
 2283    destroy_queue_if_empty(Queue).
 2284destroy_queue_from_http(ID, Event, Queue) :-
 2285    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
 2286    is_destroy_event(Event),
 2287    !,
 2288    message_queue_property(Queue, size(Waiting)),
 2289    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
 2290    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
 2291
 2292is_destroy_event(destroy(_)).
 2293is_destroy_event(destroy(_,_)).
 2294is_destroy_event(create(_, Options)) :-
 2295    memberchk(answer(Event), Options),
 2296    is_destroy_event(Event).
 2297
 2298destroy_queue_if_empty(Queue) :-
 2299    thread_peek_message(Queue, _),
 2300    !.
 2301destroy_queue_if_empty(Queue) :-
 2302    retractall(output_queue(_, Queue, _)),
 2303    message_queue_destroy(Queue).
 2304
 2305%!  gc_abandoned_queues
 2306%
 2307%   Check whether there are queues  that   have  been abadoned. This
 2308%   happens if the stream contains output events and not all of them
 2309%   are read by the client.
 2310
 2311:- dynamic
 2312    last_gc/1. 2313
 2314gc_abandoned_queues :-
 2315    consider_queue_gc,
 2316    !,
 2317    get_time(Now),
 2318    (   output_queue(_, Queue, Time),
 2319        Now-Time > 15*60,
 2320        retract(output_queue(_, Queue, Time)),
 2321        message_queue_destroy(Queue),
 2322        fail
 2323    ;   retractall(last_gc(_)),
 2324        asserta(last_gc(Now))
 2325    ).
 2326gc_abandoned_queues.
 2327
 2328consider_queue_gc :-
 2329    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
 2330    N > 100,
 2331    (   last_gc(Time),
 2332        get_time(Now),
 2333        Now-Time > 5*60
 2334    ->  true
 2335    ;   \+ last_gc(_)
 2336    ).
 2337
 2338%!  sync_destroy_queue_from_http(+Pengine, +Queue) is det.
 2339%!  sync_delay_destroy_queue(+Pengine, +Queue) is det.
 2340%
 2341%   Handle destruction of the message queue connecting the HTTP side
 2342%   to the pengine. We cannot delete the queue when the pengine dies
 2343%   because the queue may contain output  events. Termination of the
 2344%   pengine and finishing the  HTTP  exchange   may  happen  in both
 2345%   orders. This means we need handle this using synchronization.
 2346%
 2347%     * sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2348%     Called (indirectly) from pengine_done/1 if the pengine's
 2349%     thread dies.
 2350%     * sync_destroy_queue_from_http(+Pengine, +Queue)
 2351%     Called from destroy_queue/3, from wait_and_output_result/4,
 2352%     i.e., from the HTTP side.
 2353
 2354:- dynamic output_queue_destroyed/1. 2355
 2356sync_destroy_queue_from_http(ID, Queue) :-
 2357    (   output_queue(ID, Queue, _)
 2358    ->  destroy_queue_if_empty(Queue)
 2359    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
 2360    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
 2361              [Queue]),
 2362        get_time(Now),
 2363        asserta(output_queue(ID, Queue, Now))
 2364    ;   message_queue_destroy(Queue),
 2365        asserta(output_queue_destroyed(Queue))
 2366    ).
 2367
 2368%!  sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2369%
 2370%   Called  from  pengine_unregister/1  when    the  pengine  thread
 2371%   terminates. It is called while the mutex `pengine` held.
 2372
 2373sync_destroy_queue_from_pengine(ID, Queue) :-
 2374    (   retract(output_queue_destroyed(Queue))
 2375    ->  true
 2376    ;   get_time(Now),
 2377        asserta(output_queue(ID, Queue, Now))
 2378    ),
 2379    retractall(pengine_queue(ID, Queue, _, _)).
 2380
 2381
 2382http_pengine_send(Request) :-
 2383    reply_options(Request, [get,post]),
 2384    !.
 2385http_pengine_send(Request) :-
 2386    http_parameters(Request,
 2387                    [ id(ID, [ type(atom) ]),
 2388                      event(EventString, [optional(true)]),
 2389                      format(Format, [default(prolog)])
 2390                    ]),
 2391    get_pengine_module(ID, Module),
 2392    (   current_module(Module)          % avoid re-creating the module
 2393    ->  catch(( read_event(Request, EventString, Module, Event0, Bindings),
 2394                fix_bindings(Format, Event0, Bindings, Event1)
 2395              ),
 2396              Error,
 2397              true),
 2398        (   var(Error)
 2399        ->  debug(pengine(event), 'HTTP send: ~p', [Event1]),
 2400            (   pengine_thread(ID, Thread)
 2401            ->  pengine_queue(ID, Queue, TimeLimit, _),
 2402                random_delay,
 2403                broadcast(pengine(send(ID, Event1))),
 2404                thread_send_message(Thread, pengine_request(Event1)),
 2405                wait_and_output_result(ID, Queue, Format, TimeLimit)
 2406            ;   atom(ID)
 2407            ->  pengine_died(Format, ID)
 2408            ;   http_404([], Request)
 2409            )
 2410        ;   output_result(Format, error(ID, Error))
 2411        )
 2412    ;   debug(pengine(event), 'Pengine module ~q vanished', [Module]),
 2413        discard_post_data(Request),
 2414        pengine_died(Format, ID)
 2415    ).
 2416
 2417pengine_died(Format, Pengine) :-
 2418    output_result(Format, error(Pengine,
 2419                                error(existence_error(pengine, Pengine),_))).
 2420
 2421
 2422%%  read_event(+Request, +EventString, +Module, -Event, -Bindings)
 2423%
 2424%   Read the sent event. The event is a   Prolog  term that is either in
 2425%   the =event= parameter or as a posted document.
 2426
 2427read_event(_Request, EventString, Module, Event, Bindings) :-
 2428    nonvar(EventString),
 2429    !,
 2430    term_string(Event, EventString,
 2431                [ variable_names(Bindings),
 2432                  module(Module)
 2433                ]).
 2434read_event(Request, _EventString, Module, Event, Bindings) :-
 2435    option(method(post), Request),
 2436    http_read_data(Request,     Event,
 2437                   [ content_type('application/x-prolog'),
 2438                     module(Module),
 2439                     variable_names(Bindings)
 2440                   ]).
 2441
 2442%%  discard_post_data(+Request) is det.
 2443%
 2444%   If this is a POST request, discard the posted data.
 2445
 2446discard_post_data(Request) :-
 2447    option(method(post), Request),
 2448    !,
 2449    setup_call_cleanup(
 2450        open_null_stream(NULL),
 2451        http_read_data(Request, _, [to(stream(NULL))]),
 2452        close(NULL)).
 2453discard_post_data(_).
 2454
 2455%!  fix_bindings(+Format, +EventIn, +Bindings, -Event) is det.
 2456%
 2457%   Generate the template for json(-s) Format  from the variables in
 2458%   the asked Goal. Variables starting  with an underscore, followed
 2459%   by an capital letter are ignored from the template.
 2460
 2461fix_bindings(Format,
 2462             ask(Goal, Options0), Bindings,
 2463             ask(Goal, NewOptions)) :-
 2464    json_lang(Format),
 2465    !,
 2466    exclude(anon, Bindings, NamedBindings),
 2467    template(NamedBindings, Template, Options0, Options1),
 2468    select_option(chunk(Paging), Options1, Options2, 1),
 2469    NewOptions = [ template(Template),
 2470                   chunk(Paging),
 2471                   bindings(NamedBindings)
 2472                 | Options2
 2473                 ].
 2474fix_bindings(_, Command, _, Command).
 2475
 2476template(_, Template, Options0, Options) :-
 2477    select_option(template(Template), Options0, Options),
 2478    !.
 2479template(Bindings, Template, Options, Options) :-
 2480    dict_create(Template, swish_default_template, Bindings).
 2481
 2482anon(Name=_) :-
 2483    sub_atom(Name, 0, _, _, '_'),
 2484    sub_atom(Name, 1, 1, _, Next),
 2485    char_type(Next, prolog_var_start).
 2486
 2487var_name(Name=_, Name).
 2488
 2489
 2490%!  json_lang(+Format) is semidet.
 2491%
 2492%   True if Format is a JSON variation.
 2493
 2494json_lang(json) :- !.
 2495json_lang(Format) :-
 2496    sub_atom(Format, 0, _, _, 'json-').
 2497
 2498%!  http_pengine_pull_response(+Request)
 2499%
 2500%   HTTP handler for /pengine/pull_response.  Pulls possible pending
 2501%   messages from the pengine.
 2502
 2503http_pengine_pull_response(Request) :-
 2504    reply_options(Request, [get]),
 2505    !.
 2506http_pengine_pull_response(Request) :-
 2507    http_parameters(Request,
 2508            [   id(ID, []),
 2509                format(Format, [default(prolog)])
 2510            ]),
 2511    (   (   pengine_queue(ID, Queue, TimeLimit, _)
 2512        ->  true
 2513        ;   output_queue(ID, Queue, _),
 2514            TimeLimit = 0
 2515        )
 2516    ->  wait_and_output_result(ID, Queue, Format, TimeLimit)
 2517    ;   http_404([], Request)
 2518    ).
 2519
 2520%!  http_pengine_abort(+Request)
 2521%
 2522%   HTTP handler for /pengine/abort. Note that  abort may be sent at
 2523%   any time and the reply may  be   handled  by a pull_response. In
 2524%   that case, our  pengine  has  already   died  before  we  get to
 2525%   wait_and_output_result/4.
 2526
 2527http_pengine_abort(Request) :-
 2528    reply_options(Request, [get]),
 2529    !.
 2530http_pengine_abort(Request) :-
 2531    http_parameters(Request,
 2532            [   id(ID, []),
 2533                format(Format, [default(prolog)])
 2534            ]),
 2535    (   pengine_thread(ID, _Thread),
 2536        pengine_queue(ID, Queue, TimeLimit, _)
 2537    ->  broadcast(pengine(abort(ID))),
 2538        abort_pending_output(ID),
 2539        pengine_abort(ID),
 2540        wait_and_output_result(ID, Queue, Format, TimeLimit)
 2541    ;   http_404([], Request)
 2542    ).
 2543
 2544http_pengine_destroy_all(Request) :-
 2545    reply_options(Request, [get]),
 2546    !.
 2547http_pengine_destroy_all(Request) :-
 2548    http_parameters(Request,
 2549                    [ ids(IDsAtom, [])
 2550                    ]),
 2551    atomic_list_concat(IDs, ',', IDsAtom),
 2552    forall(member(ID, IDs),
 2553           pengine_destroy(ID, [force(true)])),
 2554    reply_json("ok").
 2555
 2556%!  http_pengine_ping(+Request)
 2557%
 2558%   HTTP handler for /pengine/ping.  If   the  requested  Pengine is
 2559%   alive and event status(Pengine, Stats) is created, where `Stats`
 2560%   is the return of thread_statistics/2.
 2561
 2562http_pengine_ping(Request) :-
 2563    reply_options(Request, [get]),
 2564    !.
 2565http_pengine_ping(Request) :-
 2566    http_parameters(Request,
 2567                    [ id(Pengine, []),
 2568                      format(Format, [default(prolog)])
 2569                    ]),
 2570    (   pengine_thread(Pengine, Thread),
 2571        catch(thread_statistics(Thread, Stats), _, fail)
 2572    ->  output_result(Format, ping(Pengine, Stats))
 2573    ;   output_result(Format, died(Pengine))
 2574    ).
 2575
 2576
 2577%!  output_result(+Format, +EventTerm) is det.
 2578%!  output_result(+Format, +EventTerm, +OptionsDict) is det.
 2579%
 2580%   Formulate an HTTP response from a pengine event term. Format is
 2581%   one of =prolog=, =json= or =json-s=.
 2582
 2583:- dynamic
 2584    pengine_replying/2.             % +Pengine, +Thread
 2585
 2586output_result(Format, Event) :-
 2587    arg(1, Event, Pengine),
 2588    thread_self(Thread),
 2589    setup_call_cleanup(
 2590        asserta(pengine_replying(Pengine, Thread), Ref),
 2591        catch(output_result(Format, Event, _{}),
 2592              pengine_abort_output,
 2593              true),
 2594        erase(Ref)).
 2595
 2596output_result(prolog, Event, _) :-
 2597    !,
 2598    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
 2599    write_term(Event,
 2600               [ quoted(true),
 2601                 ignore_ops(true),
 2602                 fullstop(true),
 2603                 blobs(portray),
 2604                 portray_goal(portray_blob),
 2605                 nl(true)
 2606               ]).
 2607output_result(Lang, Event, Dict) :-
 2608    write_result(Lang, Event, Dict),
 2609    !.
 2610output_result(Lang, Event, _) :-
 2611    json_lang(Lang),
 2612    !,
 2613    (   event_term_to_json_data(Event, JSON, Lang)
 2614    ->  cors_enable,
 2615        disable_client_cache,
 2616        reply_json(JSON)
 2617    ;   assertion(event_term_to_json_data(Event, _, Lang))
 2618    ).
 2619output_result(Lang, _Event, _) :-    % FIXME: allow for non-JSON format
 2620    domain_error(pengine_format, Lang).
 2621
 2622%!  portray_blob(+Blob, +Options) is det.
 2623%
 2624%   Portray non-text blobs that may  appear   in  output  terms. Not
 2625%   really sure about that. Basically such  terms need to be avoided
 2626%   as they are meaningless outside the process. The generated error
 2627%   is hard to debug though, so now we send them as `'$BLOB'(Type)`.
 2628%   Future versions may include more info, depending on `Type`.
 2629
 2630:- public portray_blob/2.               % called from write-term
 2631portray_blob(Blob, _Options) :-
 2632    blob(Blob, Type),
 2633    writeq('$BLOB'(Type)).
 2634
 2635%!  abort_pending_output(+Pengine) is det.
 2636%
 2637%   If we get an abort, it is possible that output is being produced
 2638%   for the client.  This predicate aborts these threads.
 2639
 2640abort_pending_output(Pengine) :-
 2641    forall(pengine_replying(Pengine, Thread),
 2642           abort_output_thread(Thread)).
 2643
 2644abort_output_thread(Thread) :-
 2645    catch(thread_signal(Thread, throw(pengine_abort_output)),
 2646          error(existence_error(thread, _), _),
 2647          true).
 2648
 2649%!  write_result(+Lang, +Event, +Dict) is semidet.
 2650%
 2651%   Hook that allows for different output formats. The core Pengines
 2652%   library supports `prolog` and various   JSON  dialects. The hook
 2653%   event_to_json/3 can be used to refine   the  JSON dialects. This
 2654%   hook must be used if  a   completely  different output format is
 2655%   desired.
 2656
 2657%!  disable_client_cache
 2658%
 2659%   Make sure the client will not cache our page.
 2660%
 2661%   @see http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
 2662
 2663disable_client_cache :-
 2664    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
 2665            Pragma: no-cache\r\n\c
 2666            Expires: 0\r\n').
 2667
 2668event_term_to_json_data(Event, JSON, Lang) :-
 2669    event_to_json(Event, JSON, Lang),
 2670    !.
 2671event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
 2672                        json{event:success, id:ID, time:Time,
 2673                             data:Bindings, more:More, projection:Projection},
 2674                        json) :-
 2675    !,
 2676    term_to_json(Bindings0, Bindings).
 2677event_term_to_json_data(destroy(ID, Event),
 2678                        json{event:destroy, id:ID, data:JSON},
 2679                        Style) :-
 2680    !,
 2681    event_term_to_json_data(Event, JSON, Style).
 2682event_term_to_json_data(create(ID, Features0), JSON, Style) :-
 2683    !,
 2684    (   select(answer(First0), Features0, Features1)
 2685    ->  event_term_to_json_data(First0, First, Style),
 2686        Features = [answer(First)|Features1]
 2687    ;   Features = Features0
 2688    ),
 2689    dict_create(JSON, json, [event(create), id(ID)|Features]).
 2690event_term_to_json_data(destroy(ID, Event),
 2691                        json{event:destroy, id:ID, data:JSON}, Style) :-
 2692    !,
 2693    event_term_to_json_data(Event, JSON, Style).
 2694event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
 2695    !,
 2696    Error0 = json{event:error, id:ID, data:Message},
 2697    add_error_details(ErrorTerm, Error0, Error),
 2698    message_to_string(ErrorTerm, Message).
 2699event_term_to_json_data(failure(ID, Time),
 2700                        json{event:failure, id:ID, time:Time}, _) :-
 2701    !.
 2702event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
 2703    functor(EventTerm, F, 1),
 2704    !,
 2705    arg(1, EventTerm, ID).
 2706event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
 2707    functor(EventTerm, F, 2),
 2708    arg(1, EventTerm, ID),
 2709    arg(2, EventTerm, Data),
 2710    term_to_json(Data, JSON).
 2711
 2712:- public add_error_details/3. 2713
 2714%%  add_error_details(+Error, +JSON0, -JSON)
 2715%
 2716%   Add format error code and  location   information  to an error. Also
 2717%   used by pengines_io.pl.
 2718
 2719add_error_details(Error, JSON0, JSON) :-
 2720    add_error_code(Error, JSON0, JSON1),
 2721    add_error_location(Error, JSON1, JSON).
 2722
 2723%%  add_error_code(+Error, +JSON0, -JSON) is det.
 2724%
 2725%   Add a =code= field to JSON0 of Error is an ISO error term. The error
 2726%   code is the functor name of  the   formal  part  of the error, e.g.,
 2727%   =syntax_error=,  =type_error=,  etc.   Some    errors   carry   more
 2728%   information:
 2729%
 2730%     - existence_error(Type, Obj)
 2731%     {arg1:Type, arg2:Obj}, where Obj is stringified of it is not
 2732%     atomic.
 2733
 2734add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
 2735    atom(Type),
 2736    !,
 2737    to_atomic(Obj, Value),
 2738    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
 2739add_error_code(error(Formal, _), Error0, Error) :-
 2740    callable(Formal),
 2741    !,
 2742    functor(Formal, Code, _),
 2743    Error = Error0.put(code, Code).
 2744add_error_code(_, Error, Error).
 2745
 2746% What to do with large integers?
 2747to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
 2748to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
 2749to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
 2750to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
 2751
 2752
 2753%%  add_error_location(+Error, +JSON0, -JSON) is det.
 2754%
 2755%   Add a =location= property if the  error   can  be  associated with a
 2756%   source location. The location is an   object  with properties =file=
 2757%   and =line= and, if available, the character location in the line.
 2758
 2759add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
 2760    atom(Path), integer(Line),
 2761    !,
 2762    Term = Term0.put(_{location:_{file:Path, line:Line}}).
 2763add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
 2764    atom(Path), integer(Line), integer(Ch),
 2765    !,
 2766    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
 2767add_error_location(_, Term, Term).
 2768
 2769
 2770%!  event_to_json(+Event, -JSONTerm, +Lang) is semidet.
 2771%
 2772%   Hook that translates a Pengine event  structure into a term suitable
 2773%   for reply_json/1, according to the language specification Lang. This
 2774%   can be used to massage general Prolog terms, notably associated with
 2775%   `success(ID, Bindings, Projection,  Time,   More)`  and  `output(ID,
 2776%   Term)` into a format suitable for processing at the client side.
 2777
 2778%:- multifile pengines:event_to_json/3.
 2779
 2780
 2781                 /*******************************
 2782                 *        ACCESS CONTROL        *
 2783                 *******************************/
 2784
 2785%!  allowed(+Request, +Application) is det.
 2786%
 2787%   Check whether the peer is allowed to connect.  Returns a
 2788%   =forbidden= header if contact is not allowed.
 2789
 2790allowed(Request, Application) :-
 2791    setting(Application:allow_from, Allow),
 2792    match_peer(Request, Allow),
 2793    setting(Application:deny_from, Deny),
 2794    \+ match_peer(Request, Deny),
 2795    !.
 2796allowed(Request, _Application) :-
 2797    memberchk(request_uri(Here), Request),
 2798    throw(http_reply(forbidden(Here))).
 2799
 2800match_peer(_, Allowed) :-
 2801    memberchk(*, Allowed),
 2802    !.
 2803match_peer(_, []) :- !, fail.
 2804match_peer(Request, Allowed) :-
 2805    http_peer(Request, Peer),
 2806    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
 2807    (   memberchk(Peer, Allowed)
 2808    ->  true
 2809    ;   member(Pattern, Allowed),
 2810        match_peer_pattern(Pattern, Peer)
 2811    ).
 2812
 2813match_peer_pattern(Pattern, Peer) :-
 2814    ip_term(Pattern, IP),
 2815    ip_term(Peer, IP),
 2816    !.
 2817
 2818ip_term(Peer, Pattern) :-
 2819    split_string(Peer, ".", "", PartStrings),
 2820    ip_pattern(PartStrings, Pattern).
 2821
 2822ip_pattern([], []).
 2823ip_pattern([*], _) :- !.
 2824ip_pattern([S|T0], [N|T]) :-
 2825    number_string(N, S),
 2826    ip_pattern(T0, T).
 2827
 2828
 2829%%  authenticate(+Request, +Application, -UserOptions:list) is det.
 2830%
 2831%   Call authentication_hook/3, returning either `[user(User)]`, `[]` or
 2832%   an exception.
 2833
 2834authenticate(Request, Application, UserOptions) :-
 2835    authentication_hook(Request, Application, User),
 2836    !,
 2837    must_be(ground, User),
 2838    UserOptions = [user(User)].
 2839authenticate(_, _, []).
 2840
 2841%%  authentication_hook(+Request, +Application, -User) is semidet.
 2842%
 2843%   This hook is called  from  the   =/pengine/create=  HTTP  handler to
 2844%   discover whether the server is accessed   by  an authorized user. It
 2845%   can react in three ways:
 2846%
 2847%     - Succeed, binding User to a ground term.  The authentity of the
 2848%       user is available through pengine_user/1.
 2849%     - Fail.  The =/create= succeeds, but the pengine is not associated
 2850%       with a user.
 2851%     - Throw an exception to prevent creation of the pengine.  Two
 2852%       meaningful exceptions are:
 2853%         - throw(http_reply(authorise(basic(Realm))))
 2854%         Start a normal HTTP login challenge (reply 401)
 2855%         - throw(http_reply(forbidden(Path))))
 2856%         Reject the request using a 403 repply.
 2857%
 2858%   @see http_authenticate/3 can be used to implement this hook using
 2859%        default HTTP authentication data.
 2860
 2861pengine_register_user(Options) :-
 2862    option(user(User), Options),
 2863    !,
 2864    pengine_self(Me),
 2865    asserta(pengine_user(Me, User)).
 2866pengine_register_user(_).
 2867
 2868
 2869%%  pengine_user(-User) is semidet.
 2870%
 2871%   True when the pengine was create by  an HTTP request that authorized
 2872%   User.
 2873%
 2874%   @see authentication_hook/3 can be used to extract authorization from
 2875%        the HTTP header.
 2876
 2877pengine_user(User) :-
 2878    pengine_self(Me),
 2879    pengine_user(Me, User).
 2880
 2881%!  reply_options(+Request, +Methods) is semidet.
 2882%
 2883%   Reply the HTTP OPTIONS request
 2884
 2885reply_options(Request, Allowed) :-
 2886    option(method(options), Request),
 2887    !,
 2888    cors_enable(Request,
 2889                [ methods(Allowed)
 2890                ]),
 2891    format('Content-type: text/plain\r\n'),
 2892    format('~n').                   % empty body
 2893
 2894
 2895                 /*******************************
 2896                 *        COMPILE SOURCE        *
 2897                 *******************************/
 2898
 2899/** pengine_src_text(+SrcText, +Module) is det
 2900
 2901Asserts the clauses defined in SrcText in   the  private database of the
 2902current Pengine. This  predicate  processes   the  `src_text'  option of
 2903pengine_create/1.
 2904*/
 2905
 2906pengine_src_text(Src, Module) :-
 2907    pengine_self(Self),
 2908    format(atom(ID), 'pengine://~w/src', [Self]),
 2909    extra_load_options(Self, Options),
 2910    setup_call_cleanup(
 2911        open_chars_stream(Src, Stream),
 2912        load_files(Module:ID,
 2913                   [ stream(Stream),
 2914                     module(Module),
 2915                     silent(true)
 2916                   | Options
 2917                   ]),
 2918        close(Stream)),
 2919    keep_source(Self, ID, Src).
 2920
 2921system:'#file'(File, _Line) :-
 2922    prolog_load_context(stream, Stream),
 2923    set_stream(Stream, file_name(File)),
 2924    set_stream(Stream, record_position(false)),
 2925    set_stream(Stream, record_position(true)).
 2926
 2927%%   pengine_src_url(+URL, +Module) is det
 2928%
 2929%    Asserts the clauses defined in URL in   the private database of the
 2930%    current Pengine. This predicate processes   the `src_url' option of
 2931%    pengine_create/1.
 2932%
 2933%    @tbd: make a sensible guess at the encoding.
 2934
 2935pengine_src_url(URL, Module) :-
 2936    pengine_self(Self),
 2937    uri_encoded(path, URL, Path),
 2938    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
 2939    extra_load_options(Self, Options),
 2940    (   get_pengine_application(Self, Application),
 2941        setting(Application:debug_info, false)
 2942    ->  setup_call_cleanup(
 2943            http_open(URL, Stream, []),
 2944            ( set_stream(Stream, encoding(utf8)),
 2945              load_files(Module:ID,
 2946                         [ stream(Stream),
 2947                           module(Module)
 2948                         | Options
 2949                         ])
 2950            ),
 2951            close(Stream))
 2952    ;   setup_call_cleanup(
 2953            http_open(URL, TempStream, []),
 2954            ( set_stream(TempStream, encoding(utf8)),
 2955              read_string(TempStream, _, Src)
 2956            ),
 2957            close(TempStream)),
 2958        setup_call_cleanup(
 2959            open_chars_stream(Src, Stream),
 2960            load_files(Module:ID,
 2961                       [ stream(Stream),
 2962                         module(Module)
 2963                       | Options
 2964                       ]),
 2965            close(Stream)),
 2966        keep_source(Self, ID, Src)
 2967    ).
 2968
 2969
 2970extra_load_options(Pengine, Options) :-
 2971    pengine_not_sandboxed(Pengine),
 2972    !,
 2973    Options = [].
 2974extra_load_options(_, [sandboxed(true)]).
 2975
 2976
 2977keep_source(Pengine, ID, SrcText) :-
 2978    get_pengine_application(Pengine, Application),
 2979    setting(Application:debug_info, true),
 2980    !,
 2981    to_string(SrcText, SrcString),
 2982    assertz(pengine_data(Pengine, source(ID, SrcString))).
 2983keep_source(_, _, _).
 2984
 2985to_string(String, String) :-
 2986    string(String),
 2987    !.
 2988to_string(Atom, String) :-
 2989    atom_string(Atom, String),
 2990    !.
 2991
 2992		 /*******************************
 2993		 *            SANDBOX		*
 2994		 *******************************/
 2995
 2996:- multifile
 2997    sandbox:safe_primitive/1. 2998
 2999sandbox:safe_primitive(pengines:pengine_input(_, _)).
 3000sandbox:safe_primitive(pengines:pengine_output(_)).
 3001sandbox:safe_primitive(pengines:pengine_debug(_,_)).
 3002
 3003
 3004                 /*******************************
 3005                 *            MESSAGES          *
 3006                 *******************************/
 3007
 3008prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
 3009    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
 3010      'This is normally caused by an insufficiently instantiated'-[], nl,
 3011      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
 3012      'find all possible instantations of Var.'-[]
 3013    ]