View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2018, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(thread_httpd,
   38          [ http_current_server/2,      % ?:Goal, ?Port
   39            http_server_property/2,     % ?Port, ?Property
   40            http_server/2,              % :Goal, +Options
   41            http_workers/2,             % +Port, ?WorkerCount
   42            http_add_worker/2,          % +Port, +Options
   43            http_current_worker/2,      % ?Port, ?ThreadID
   44            http_stop_server/2,         % +Port, +Options
   45            http_spawn/2,               % :Goal, +Options
   46
   47            http_requeue/1,             % +Request
   48            http_close_connection/1,    % +Request
   49            http_enough_workers/3       % +Queue, +Why, +Peer
   50          ]).   51:- use_module(library(debug)).   52:- use_module(library(error)).   53:- use_module(library(option)).   54:- use_module(library(socket)).   55:- use_module(library(thread_pool)).   56:- use_module(library(gensym)).   57:- use_module(http_wrapper).   58:- use_module(http_path).   59
   60
   61:- predicate_options(http_server/2, 2,
   62                     [ port(any),
   63                       entry_page(atom),
   64                       tcp_socket(any),
   65                       workers(positive_integer),
   66                       timeout(number),
   67                       keep_alive_timeout(number),
   68                       silent(boolean),
   69                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   70                       pass_to(system:thread_create/3, 3)
   71                     ]).   72:- predicate_options(http_spawn/2, 2,
   73                     [ pool(atom),
   74                       pass_to(system:thread_create/3, 3),
   75                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   76                     ]).   77:- predicate_options(http_add_worker/2, 2,
   78                     [ timeout(number),
   79                       keep_alive_timeout(number),
   80                       max_idle_time(number),
   81                       pass_to(system:thread_create/3, 3)
   82                     ]).   83
   84/** <module> Threaded HTTP server
   85
   86This library defines the HTTP server  frontend of choice for SWI-Prolog.
   87It is based on the multi-threading   capabilities of SWI-Prolog and thus
   88exploits multiple cores  to  serve   requests  concurrently.  The server
   89scales well and can cooperate with   library(thread_pool) to control the
   90number of concurrent requests of a given   type.  For example, it can be
   91configured to handle 200 file download requests concurrently, 2 requests
   92that potentially uses a lot of memory and   8 requests that use a lot of
   93CPU resources.
   94
   95On   Unix   systems,    this    library     can    be    combined   with
   96library(http/http_unix_daemon) to realise a proper  Unix service process
   97that creates a web server at  port   80,  runs under a specific account,
   98optionally detaches from the controlling terminal, etc.
   99
  100Combined with library(http/http_ssl_plugin) from the   SSL package, this
  101library   can   be   used   to    create     an    HTTPS   server.   See
  102<plbase>/doc/packages/examples/ssl/https for an example   server using a
  103self-signed SSL certificate.
  104*/
  105
  106:- meta_predicate
  107    http_server(1, :),
  108    http_current_server(1, ?),
  109    http_spawn(0, +).  110
  111:- dynamic
  112    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  113    queue_worker/2,         % Queue, ThreadID
  114    queue_options/2.        % Queue, Options
  115
  116:- multifile
  117    make_socket_hook/3,
  118    accept_hook/2,
  119    close_hook/1,
  120    open_client_hook/6,
  121    http:create_pool/1,
  122    http:schedule_workers/1.  123
  124%!  http_server(:Goal, :Options) is det.
  125%
  126%   Create a server at Port that calls Goal for each parsed request.
  127%   Options provide a list of options. Defined options are
  128%
  129%     * port(?Address)
  130%     Port to bind to.  Address is either a port or a term
  131%     Host:Port. The port may be a variable, causing the system
  132%     to select a free port.  See tcp_bind/2.
  133%
  134%     * entry_page(+URI)
  135%     Affects the message printed while the server is started.
  136%     Interpreted as a URI relative to the server root.
  137%
  138%     * tcp_socket(+Socket)
  139%     If provided, use this socket instead of the creating one and
  140%     binding it to an address.  The socket must be bound to an
  141%     address.
  142%
  143%     * workers(+Count)
  144%     Determine the number of worker threads.  Default is 5.  This
  145%     is fine for small scale usage.  Public servers typically need
  146%     a higher number.
  147%
  148%     * timeout(+Seconds)
  149%     Maximum time of inactivity trying to read the request after a
  150%     connection has been opened.  Default is 60 seconds.  See
  151%     set_stream/1 using the _timeout_ option.
  152%
  153%     * keep_alive_timeout(+Seconds)
  154%     Time to keep `Keep alive' connections alive.  Default is
  155%     2 seconds.
  156%
  157%     * stack_limit(+Bytes)
  158%     Stack limit to use for the workers.  The default is inherited
  159%     from the `main` thread.
  160%     If you need to control resource usage you may consider the
  161%     `spawn` option of http_handler/3 and library(thread_pool).
  162%
  163%     * silent(Bool)
  164%     If `true` (default `false`), do not print an informational
  165%     message that the server was started.
  166%
  167%   A  typical  initialization  for  an    HTTP   server  that  uses
  168%   http_dispatch/1 to relay requests to predicates is:
  169%
  170%     ==
  171%     :- use_module(library(http/thread_httpd)).
  172%     :- use_module(library(http/http_dispatch)).
  173%
  174%     start_server(Port) :-
  175%         http_server(http_dispatch, [port(Port)]).
  176%     ==
  177%
  178%   Note that multiple servers  can  coexist   in  the  same  Prolog
  179%   process. A notable application of this is   to have both an HTTP
  180%   and HTTPS server, where the HTTP   server redirects to the HTTPS
  181%   server for handling sensitive requests.
  182
  183http_server(Goal, M:Options0) :-
  184    option(port(Port), Options0),
  185    !,
  186    make_socket(Port, M:Options0, Options),
  187    create_workers(Options),
  188    create_server(Goal, Port, Options),
  189    (   option(silent(true), Options0)
  190    ->  true
  191    ;   print_message(informational,
  192                      httpd_started_server(Port, Options0))
  193    ).
  194http_server(_Goal, _Options) :-
  195    existence_error(option, port).
  196
  197
  198%!  make_socket(?Port, :OptionsIn, -OptionsOut) is det.
  199%
  200%   Create the HTTP server socket and  worker pool queue. OptionsOut
  201%   is quaranteed to hold the option queue(QueueId).
  202%
  203%   @arg   OptionsIn   is   qualified   to     allow   passing   the
  204%   module-sensitive ssl option argument.
  205
  206make_socket(Port, Options0, Options) :-
  207    make_socket_hook(Port, Options0, Options),
  208    !.
  209make_socket(Port, _:Options0, Options) :-
  210    option(tcp_socket(_), Options0),
  211    !,
  212    make_addr_atom('httpd', Port, Queue),
  213    Options = [ queue(Queue)
  214              | Options0
  215              ].
  216make_socket(Port, _:Options0, Options) :-
  217    tcp_socket(Socket),
  218    tcp_setopt(Socket, reuseaddr),
  219    tcp_bind(Socket, Port),
  220    tcp_listen(Socket, 5),
  221    make_addr_atom('httpd', Port, Queue),
  222    Options = [ queue(Queue),
  223                tcp_socket(Socket)
  224              | Options0
  225              ].
  226
  227%!  make_addr_atom(+Scheme, +Address, -Atom) is det.
  228%
  229%   Create an atom that identifies  the   server's  queue and thread
  230%   resources.
  231
  232make_addr_atom(Scheme, Address, Atom) :-
  233    phrase(address_parts(Address), Parts),
  234    atomic_list_concat([Scheme,@|Parts], Atom).
  235
  236address_parts(Atomic) -->
  237    { atomic(Atomic) },
  238    !,
  239    [Atomic].
  240address_parts(Host:Port) -->
  241    !,
  242    address_parts(Host), [:], address_parts(Port).
  243address_parts(ip(A,B,C,D)) -->
  244    !,
  245    [ A, '.', B, '.', C, '.', D ].
  246
  247%!  create_server(:Goal, +Address, +Options) is det.
  248%
  249%   Create the main server thread that runs accept_server/2 to
  250%   listen to new requests.
  251
  252create_server(Goal, Address, Options) :-
  253    get_time(StartTime),
  254    memberchk(queue(Queue), Options),
  255    scheme(Scheme, Options),
  256    address_port(Address, Port),
  257    make_addr_atom(Scheme, Port, Alias),
  258    thread_self(Initiator),
  259    thread_create(accept_server(Goal, Initiator, Options), _,
  260                  [ alias(Alias)
  261                  ]),
  262    thread_get_message(server_started),
  263    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  264
  265scheme(Scheme, Options) :-
  266    option(scheme(Scheme), Options),
  267    !.
  268scheme(Scheme, Options) :-
  269    (   option(ssl(_), Options)
  270    ;   option(ssl_instance(_), Options)
  271    ),
  272    !,
  273    Scheme = https.
  274scheme(http, _).
  275
  276address_port(_Host:Port, Port) :- !.
  277address_port(Port, Port).
  278
  279
  280%!  http_current_server(:Goal, ?Port) is nondet.
  281%
  282%   True if Goal is the goal of a server at Port.
  283%
  284%   @deprecated Use http_server_property(Port, goal(Goal))
  285
  286http_current_server(Goal, Port) :-
  287    current_server(Port, Goal, _, _, _, _).
  288
  289
  290%!  http_server_property(?Port, ?Property) is nondet.
  291%
  292%   True if Property is a property of the HTTP server running at
  293%   Port.  Defined properties are:
  294%
  295%       * goal(:Goal)
  296%       Goal used to start the server. This is often
  297%       http_dispatch/1.
  298%       * scheme(-Scheme)
  299%       Scheme is one of `http` or `https`.
  300%       * start_time(?Time)
  301%       Time-stamp when the server was created.
  302
  303http_server_property(_:Port, Property) :-
  304    integer(Port),
  305    !,
  306    server_property(Property, Port).
  307http_server_property(Port, Property) :-
  308    server_property(Property, Port).
  309
  310server_property(goal(Goal), Port) :-
  311    current_server(Port, Goal, _, _, _, _).
  312server_property(scheme(Scheme), Port) :-
  313    current_server(Port, _, _, _, Scheme, _).
  314server_property(start_time(Time), Port) :-
  315    current_server(Port, _, _, _, _, Time).
  316
  317
  318%!  http_workers(+Port, -Workers) is det.
  319%!  http_workers(+Port, +Workers:int) is det.
  320%
  321%   Query or set the number of workers  for the server at this port.
  322%   The number of workers is dynamically   modified. Setting it to 1
  323%   (one) can be used to profile the worker using tprofile/1.
  324
  325http_workers(Port, Workers) :-
  326    must_be(ground, Port),
  327    current_server(Port, _, _, Queue, _, _),
  328    !,
  329    (   integer(Workers)
  330    ->  resize_pool(Queue, Workers)
  331    ;   findall(W, queue_worker(Queue, W), WorkerIDs),
  332        length(WorkerIDs, Workers)
  333    ).
  334http_workers(Port, _) :-
  335    existence_error(http_server, Port).
  336
  337
  338%!  http_add_worker(+Port, +Options) is det.
  339%
  340%   Add a new worker to  the  HTTP   server  for  port Port. Options
  341%   overrule the default queue  options.   The  following additional
  342%   options are processed:
  343%
  344%     - max_idle_time(+Seconds)
  345%     The created worker will automatically terminate if there is
  346%     no new work within Seconds.
  347
  348http_add_worker(Port, Options) :-
  349    must_be(ground, Port),
  350    current_server(Port, _, _, Queue, _, _),
  351    !,
  352    queue_options(Queue, QueueOptions),
  353    merge_options(Options, QueueOptions, WorkerOptions),
  354    atom_concat(Queue, '_', AliasBase),
  355    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  356http_add_worker(Port, _) :-
  357    existence_error(http_server, Port).
  358
  359
  360%!  http_current_worker(?Port, ?ThreadID) is nondet.
  361%
  362%   True if ThreadID is the identifier   of  a Prolog thread serving
  363%   Port. This predicate is  motivated  to   allow  for  the  use of
  364%   arbitrary interaction with the worker thread for development and
  365%   statistics.
  366
  367http_current_worker(Port, ThreadID) :-
  368    current_server(Port, _, _, Queue, _, _),
  369    queue_worker(Queue, ThreadID).
  370
  371
  372%!  accept_server(:Goal, +Initiator, +Options)
  373%
  374%   The goal of a small server-thread accepting new requests and
  375%   posting them to the queue of workers.
  376
  377accept_server(Goal, Initiator, Options) :-
  378    catch(accept_server2(Goal, Initiator, Options), http_stop, true),
  379    thread_self(Thread),
  380    retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
  381    close_server_socket(Options).
  382
  383accept_server2(Goal, Initiator, Options) :-
  384    thread_send_message(Initiator, server_started),
  385    repeat,
  386      (   catch(accept_server3(Goal, Options), E, true)
  387      ->  (   var(E)
  388          ->  fail
  389          ;   accept_rethrow_error(E)
  390          ->  throw(E)
  391          ;   print_message(error, E),
  392              fail
  393          )
  394      ;   print_message(error,      % internal error
  395                        goal_failed(accept_server3(Goal, Options))),
  396          fail
  397      ).
  398
  399accept_server3(Goal, Options) :-
  400    accept_hook(Goal, Options),
  401    !.
  402accept_server3(Goal, Options) :-
  403    memberchk(tcp_socket(Socket), Options),
  404    memberchk(queue(Queue), Options),
  405    tcp_accept(Socket, Client, Peer),
  406    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  407    thread_send_message(Queue, tcp_client(Client, Goal, Peer)),
  408    http_enough_workers(Queue, accept, Peer).
  409
  410accept_rethrow_error(http_stop).
  411accept_rethrow_error('$aborted').
  412
  413
  414%!  close_server_socket(+Options)
  415%
  416%   Close the server socket.
  417
  418close_server_socket(Options) :-
  419    close_hook(Options),
  420    !.
  421close_server_socket(Options) :-
  422    memberchk(tcp_socket(Socket), Options),
  423    !,
  424    tcp_close_socket(Socket).
  425
  426
  427%!  http_stop_server(+Port, +Options)
  428%
  429%   Stop the indicated  HTTP  server   gracefully.  First  stops all
  430%   workers, then stops the server.
  431%
  432%   @tbd    Realise non-graceful stop
  433
  434http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  435    ground(Host),
  436    !,
  437    http_stop_server(Port, Options).
  438http_stop_server(Port, _Options) :-
  439    http_workers(Port, 0),                  % checks Port is ground
  440    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  441    retractall(queue_options(Queue, _)),
  442    thread_signal(Thread, throw(http_stop)),
  443    catch(connect(localhost:Port), _, true),
  444    thread_join(Thread, _),
  445    message_queue_destroy(Queue).
  446
  447connect(Address) :-
  448    setup_call_cleanup(
  449        tcp_socket(Socket),
  450        tcp_connect(Socket, Address),
  451        tcp_close_socket(Socket)).
  452
  453%!  http_enough_workers(+Queue, +Why, +Peer) is det.
  454%
  455%   Check that we have enough workers in our queue. If not, call the
  456%   hook http:schedule_workers/1 to extend  the   worker  pool. This
  457%   predicate can be used by accept_hook/2.
  458
  459http_enough_workers(Queue, Why, Peer) :-
  460    message_queue_property(Queue, size(Size)),
  461    (   enough(Size, Why)
  462    ->  true
  463    ;   current_server(Port, _, _, Queue, _, _),
  464        catch(http:schedule_workers(_{port:Port,
  465                                      reason:Why,
  466                                      peer:Peer,
  467                                      waiting:Size,
  468                                      queue:Queue
  469                                     }),
  470              Error,
  471              print_message(error, Error))
  472    ->  true
  473    ;   true
  474    ).
  475
  476enough(0, _).
  477enough(1, keep_alive).                  % I will be ready myself
  478
  479
  480%!  http:schedule_workers(+Data:dict) is semidet.
  481%
  482%   Hook called if a  new  connection   or  a  keep-alive connection
  483%   cannot be scheduled _immediately_ to a worker. Dict contains the
  484%   following keys:
  485%
  486%     - port:Port
  487%     Port number that identifies the server.
  488%     - reason:Reason
  489%     One of =accept= for a new connection or =keep_alive= if a
  490%     worker tries to reschedule itself.
  491%     - peer:Peer
  492%     Identify the other end of the connection
  493%     - waiting:Size
  494%     Number of messages waiting in the queue.
  495%     - queue:Queue
  496%     Message queue used to dispatch accepted messages.
  497%
  498%   Note that, when called with `reason:accept`,   we  are called in
  499%   the time critical main accept loop.   An  implementation of this
  500%   hook shall typically send  the  event   to  thread  dedicated to
  501%   dynamic worker-pool management.
  502%
  503%   @see    http_add_worker/2 may be used to create (temporary) extra
  504%           workers.
  505
  506
  507                 /*******************************
  508                 *    WORKER QUEUE OPERATIONS   *
  509                 *******************************/
  510
  511%!  create_workers(+Options)
  512%
  513%   Create the pool of HTTP worker-threads. Each worker has the
  514%   alias http_worker_N.
  515
  516create_workers(Options) :-
  517    option(workers(N), Options, 5),
  518    option(queue(Queue), Options),
  519    catch(message_queue_create(Queue), _, true),
  520    atom_concat(Queue, '_', AliasBase),
  521    create_workers(1, N, Queue, AliasBase, Options),
  522    assert(queue_options(Queue, Options)).
  523
  524create_workers(I, N, _, _, _) :-
  525    I > N,
  526    !.
  527create_workers(I, N, Queue, AliasBase, Options) :-
  528    gensym(AliasBase, Alias),
  529    thread_create(http_worker(Options), Id,
  530                  [ alias(Alias)
  531                  | Options
  532                  ]),
  533    assertz(queue_worker(Queue, Id)),
  534    I2 is I + 1,
  535    create_workers(I2, N, Queue, AliasBase, Options).
  536
  537
  538%!  resize_pool(+Queue, +Workers) is det.
  539%
  540%   Create or destroy workers. If workers   are  destroyed, the call
  541%   waits until the desired number of waiters is reached.
  542
  543resize_pool(Queue, Size) :-
  544    findall(W, queue_worker(Queue, W), Workers),
  545    length(Workers, Now),
  546    (   Now < Size
  547    ->  queue_options(Queue, Options),
  548        atom_concat(Queue, '_', AliasBase),
  549        I0 is Now+1,
  550        create_workers(I0, Size, Queue, AliasBase, Options)
  551    ;   Now == Size
  552    ->  true
  553    ;   Now > Size
  554    ->  Excess is Now - Size,
  555        thread_self(Me),
  556        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  557        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  558    ).
  559
  560
  561%!  http_worker(+Options)
  562%
  563%   Run HTTP worker main loop. Workers   simply  wait until they are
  564%   passed an accepted socket to process  a client.
  565%
  566%   If the message quit(Sender) is read   from the queue, the worker
  567%   stops.
  568
  569http_worker(Options) :-
  570    thread_at_exit(done_worker),
  571    option(queue(Queue), Options),
  572    option(max_idle_time(MaxIdle), Options, infinite),
  573    repeat,
  574      garbage_collect,
  575      trim_stacks,
  576      debug(http(worker), 'Waiting for a job ...', []),
  577      (   MaxIdle == infinite
  578      ->  thread_get_message(Queue, Message)
  579      ;   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  580      ->  true
  581      ;   Message = quit(idle)
  582      ),
  583      debug(http(worker), 'Got job ~p', [Message]),
  584      (   Message = quit(Sender)
  585      ->  !,
  586          thread_self(Self),
  587          thread_detach(Self),
  588          (   Sender == idle
  589          ->  true
  590          ;   retract(queue_worker(Queue, Self)),
  591              thread_send_message(Sender, quitted(Self))
  592          )
  593      ;   open_client(Message, Queue, Goal, In, Out,
  594                      Options, ClientOptions),
  595          (   catch(http_process(Goal, In, Out, ClientOptions),
  596                    Error, true)
  597          ->  true
  598          ;   Error = goal_failed(http_process/4)
  599          ),
  600          (   var(Error)
  601          ->  fail
  602          ;   current_message_level(Error, Level),
  603              print_message(Level, Error),
  604              memberchk(peer(Peer), ClientOptions),
  605              close_connection(Peer, In, Out),
  606              fail
  607          )
  608      ).
  609
  610
  611%!  open_client(+Message, +Queue, -Goal, -In, -Out,
  612%!              +Options, -ClientOptions) is semidet.
  613%
  614%   Opens the connection to the client in a worker from the message
  615%   sent to the queue by accept_server/2.
  616
  617open_client(requeue(In, Out, Goal, ClOpts),
  618            _, Goal, In, Out, Opts, ClOpts) :-
  619    !,
  620    memberchk(peer(Peer), ClOpts),
  621    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  622    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  623open_client(Message, Queue, Goal, In, Out, Opts,
  624            [ pool(client(Queue, Goal, In, Out)),
  625              timeout(Timeout)
  626            | Options
  627            ]) :-
  628    catch(open_client(Message, Goal, In, Out, Options, Opts),
  629          E, report_error(E)),
  630    option(timeout(Timeout), Opts, 60),
  631    (   debugging(http(connection))
  632    ->  memberchk(peer(Peer), Options),
  633        debug(http(connection), 'Opened connection from ~p', [Peer])
  634    ;   true
  635    ).
  636
  637
  638%!  open_client(+Message, +Goal, -In, -Out,
  639%!              -ClientOptions, +Options) is det.
  640
  641open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  642    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  643    !.
  644open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  645            [ peer(Peer),
  646              protocol(http)
  647            ], _) :-
  648    tcp_open_socket(Socket, In, Out).
  649
  650report_error(E) :-
  651    print_message(error, E),
  652    fail.
  653
  654
  655%!  check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet.
  656%
  657%   Wait for the client for at most  TimeOut seconds. Succeed if the
  658%   client starts a new request within   this  time. Otherwise close
  659%   the connection and fail.
  660
  661check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  662    stream_property(In, timeout(Old)),
  663    set_stream(In, timeout(TMO)),
  664    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  665    catch(peek_code(In, Code), E, true),
  666    (   var(E),                     % no exception
  667        Code \== -1                 % no end-of-file
  668    ->  set_stream(In, timeout(Old)),
  669        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  670    ;   (   Code == -1
  671        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  672        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  673        ),
  674        close_connection(Peer, In, Out),
  675        fail
  676    ).
  677
  678
  679%!  done_worker
  680%
  681%   Called when worker is terminated  due   to  http_workers/2  or a
  682%   (debugging) exception. In  the   latter  case, recreate_worker/2
  683%   creates a new worker.
  684
  685done_worker :-
  686    thread_self(Self),
  687    thread_detach(Self),
  688    retract(queue_worker(Queue, Self)),
  689    thread_property(Self, status(Status)),
  690    !,
  691    (   catch(recreate_worker(Status, Queue), _, fail)
  692    ->  print_message(informational,
  693                      httpd_restarted_worker(Self))
  694    ;   done_status_message_level(Status, Level),
  695        print_message(Level,
  696                      httpd_stopped_worker(Self, Status))
  697    ).
  698done_worker :-                                  % received quit(Sender)
  699    thread_self(Self),
  700    thread_property(Self, status(Status)),
  701    done_status_message_level(Status, Level),
  702    print_message(Level,
  703                  httpd_stopped_worker(Self, Status)).
  704
  705done_status_message_level(true, silent) :- !.
  706done_status_message_level(exception('$aborted'), silent) :- !.
  707done_status_message_level(_, informational).
  708
  709
  710%!  recreate_worker(+Status, +Queue) is semidet.
  711%
  712%   Deal with the possibility  that   threads  are,  during development,
  713%   killed with abort/0. We recreate the worker to avoid that eventually
  714%   we run out of workers. If  we  are   aborted  due  to a halt/0 call,
  715%   thread_create/3 will raise a permission error.
  716%
  717%   The first clause deals with the possibility  that we cannot write to
  718%   `user_error`. This is possible when Prolog   is started as a service
  719%   using some service managers. Would be  nice   if  we  could write an
  720%   error, but where?
  721
  722recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  723    halt(2).
  724recreate_worker(exception(Error), Queue) :-
  725    recreate_on_error(Error),
  726    queue_options(Queue, Options),
  727    atom_concat(Queue, '_', AliasBase),
  728    create_workers(1, 1, Queue, AliasBase, Options).
  729
  730recreate_on_error('$aborted').
  731recreate_on_error(time_limit_exceeded).
  732
  733%!  thread_httpd:message_level(+Exception, -Level)
  734%
  735%   Determine the message stream used  for   exceptions  that  may occur
  736%   during server_loop/5. Being multifile, clauses can   be added by the
  737%   application to refine error handling.   See  also message_hook/3 for
  738%   further programming error handling.
  739
  740:- multifile
  741    message_level/2.  742
  743message_level(error(io_error(read, _), _),      silent).
  744message_level(error(socket_error(epipe,_), _),	silent).
  745message_level(error(timeout_error(read, _), _), informational).
  746message_level(keep_alive_timeout,               silent).
  747
  748current_message_level(Term, Level) :-
  749    (   message_level(Term, Level)
  750    ->  true
  751    ;   Level = error
  752    ).
  753
  754
  755%!  http_requeue(+Header)
  756%
  757%   Re-queue a connection to  the  worker   pool.  This  deals  with
  758%   processing additional requests on keep-alive connections.
  759
  760http_requeue(Header) :-
  761    requeue_header(Header, ClientOptions),
  762    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  763    memberchk(peer(Peer), ClientOptions),
  764    http_enough_workers(Queue, keep_alive, Peer),
  765    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  766    !.
  767http_requeue(Header) :-
  768    debug(http(error), 'Re-queue failed: ~p', [Header]),
  769    fail.
  770
  771requeue_header([], []).
  772requeue_header([H|T0], [H|T]) :-
  773    requeue_keep(H),
  774    !,
  775    requeue_header(T0, T).
  776requeue_header([_|T0], T) :-
  777    requeue_header(T0, T).
  778
  779requeue_keep(pool(_)).
  780requeue_keep(peer(_)).
  781requeue_keep(protocol(_)).
  782
  783
  784%!  http_process(Message, Queue, +Options)
  785%
  786%   Handle a single client message on the given stream.
  787
  788http_process(Goal, In, Out, Options) :-
  789    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  790          [Goal, In, Out]),
  791    option(timeout(TMO), Options, 60),
  792    set_stream(In, timeout(TMO)),
  793    set_stream(Out, timeout(TMO)),
  794    http_wrapper(Goal, In, Out, Connection,
  795                 [ request(Request)
  796                 | Options
  797                 ]),
  798    next(Connection, Request).
  799
  800next(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
  801    !,
  802    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  803    (   catch(call(SwitchGoal, In, Out), E,
  804              (   print_message(error, E),
  805                  fail))
  806    ->  true
  807    ;   http_close_connection(Request)
  808    ).
  809next(spawned(ThreadId), _) :-
  810    !,
  811    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  812next(Connection, Request) :-
  813    downcase_atom(Connection, 'keep-alive'),
  814    http_requeue(Request),
  815    !.
  816next(_, Request) :-
  817    http_close_connection(Request).
  818
  819
  820%!  http_close_connection(+Request)
  821%
  822%   Close connection associated to Request.  See also http_requeue/1.
  823
  824http_close_connection(Request) :-
  825    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  826    memberchk(peer(Peer), Request),
  827    close_connection(Peer, In, Out).
  828
  829%!  close_connection(+Peer, +In, +Out)
  830%
  831%   Closes the connection from the server to the client.  Errors are
  832%   currently silently ignored.
  833
  834close_connection(Peer, In, Out) :-
  835    debug(http(connection), 'Closing connection from ~p', [Peer]),
  836    catch(close(In, [force(true)]), _, true),
  837    catch(close(Out, [force(true)]), _, true).
  838
  839%!  http_spawn(:Goal, +Options) is det.
  840%
  841%   Continue this connection on a  new   thread.  A handler may call
  842%   http_spawn/2 to start a new thread that continues processing the
  843%   current request using Goal. The original   thread returns to the
  844%   worker pool for processing new requests.   Options are passed to
  845%   thread_create/3, except for:
  846%
  847%       * pool(+Pool)
  848%       Interfaces to library(thread_pool), starting the thread
  849%       on the given pool.
  850%
  851%   If a pool does not exist, this predicate calls the multifile
  852%   hook http:create_pool/1 to create it. If this predicate succeeds
  853%   the operation is retried.
  854
  855http_spawn(Goal, Options) :-
  856    select_option(pool(Pool), Options, ThreadOptions),
  857    !,
  858    current_output(CGI),
  859    catch(thread_create_in_pool(Pool,
  860                                wrap_spawned(CGI, Goal), Id,
  861                                [ detached(true)
  862                                | ThreadOptions
  863                                ]),
  864          Error,
  865          true),
  866    (   var(Error)
  867    ->  http_spawned(Id)
  868    ;   Error = error(resource_error(threads_in_pool(_)), _)
  869    ->  throw(http_reply(busy))
  870    ;   Error = error(existence_error(thread_pool, Pool), _),
  871        create_pool(Pool)
  872    ->  http_spawn(Goal, Options)
  873    ;   throw(Error)
  874    ).
  875http_spawn(Goal, Options) :-
  876    current_output(CGI),
  877    thread_create(wrap_spawned(CGI, Goal), Id,
  878                  [ detached(true)
  879                  | Options
  880                  ]),
  881    http_spawned(Id).
  882
  883wrap_spawned(CGI, Goal) :-
  884    set_output(CGI),
  885    http_wrap_spawned(Goal, Request, Connection),
  886    next(Connection, Request).
  887
  888%!  create_pool(+Pool)
  889%
  890%   Lazy  creation  of  worker-pools  for   the  HTTP  server.  This
  891%   predicate calls the hook http:create_pool/1.   If the hook fails
  892%   it creates a default pool of size   10. This should suffice most
  893%   typical usecases. Note that we  get   a  permission error if the
  894%   pool is already created.  We can ignore this.
  895
  896create_pool(Pool) :-
  897    E = error(permission_error(create, thread_pool, Pool), _),
  898    catch(http:create_pool(Pool), E, true).
  899create_pool(Pool) :-
  900    print_message(informational, httpd(created_pool(Pool))),
  901    thread_pool_create(Pool, 10, []).
  902
  903
  904
  905                 /*******************************
  906                 *            MESSAGES          *
  907                 *******************************/
  908
  909:- multifile
  910    prolog:message/3.  911
  912prolog:message(httpd_started_server(Port, Options)) -->
  913    [ 'Started server at '-[] ],
  914    http_root(Port, Options).
  915prolog:message(httpd_stopped_worker(Self, Status)) -->
  916    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
  917prolog:message(httpd_restarted_worker(Self)) -->
  918    [ 'Replaced aborted worker ~p'-[Self] ].
  919prolog:message(httpd(created_pool(Pool))) -->
  920    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
  921      'Create this pool at startup-time or define the hook ', nl,
  922      'http:create_pool/1 to avoid this message and create a ', nl,
  923      'pool that fits the usage-profile.'
  924    ].
  925
  926http_root(Address, Options) -->
  927    { landing_page(Address, URI, Options) },
  928    [ '~w'-[URI] ].
  929
  930landing_page(Host:Port, URI, Options) :-
  931    must_be(atom, Host),
  932    http_server_property(Port, scheme(Scheme)),
  933    (   default_port(Scheme, Port)
  934    ->  format(atom(Base), '~w://~w', [Scheme, Host])
  935    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
  936    ),
  937    entry_page(Base, URI, Options).
  938landing_page(Port, URI, Options) :-
  939    landing_page(localhost:Port, URI, Options).
  940
  941default_port(http, 80).
  942default_port(https, 443).
  943
  944entry_page(Base, URI, Options) :-
  945    option(entry_page(Entry), Options),
  946    !,
  947    uri_resolve(Entry, Base, URI).
  948entry_page(Base, URI, _) :-
  949    http_absolute_location(root(.), Entry, []),
  950    uri_resolve(Entry, Base, URI)