36
   37:- module(thread_httpd,
   38          [ http_current_server/2,         39            http_server_property/2,        40            http_server/2,                 41            http_workers/2,                42            http_add_worker/2,             43            http_current_worker/2,         44            http_stop_server/2,            45            http_spawn/2,                  46
   47            http_requeue/1,                48            http_close_connection/1,       49            http_enough_workers/3          50          ]).   51:- use_module(library(debug)).   52:- use_module(library(error)).   53:- use_module(library(option)).   54:- use_module(library(socket)).   55:- use_module(library(thread_pool)).   56:- use_module(library(gensym)).   57:- use_module(http_wrapper).   58:- use_module(http_path).   59
   60
   61:- predicate_options(http_server/2, 2,
   62                     [ port(any),
   63                       entry_page(atom),
   64                       tcp_socket(any),
   65                       workers(positive_integer),
   66                       timeout(number),
   67                       keep_alive_timeout(number),
   68                       silent(boolean),
   69                       ssl(list(any)),     70                       pass_to(system:thread_create/3, 3)
   71                     ]).   72:- predicate_options(http_spawn/2, 2,
   73                     [ pool(atom),
   74                       pass_to(system:thread_create/3, 3),
   75                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   76                     ]).   77:- predicate_options(http_add_worker/2, 2,
   78                     [ timeout(number),
   79                       keep_alive_timeout(number),
   80                       max_idle_time(number),
   81                       pass_to(system:thread_create/3, 3)
   82                     ]).   83
  105
  106:- meta_predicate
  107    http_server(1, :),
  108    http_current_server(1, ?),
  109    http_spawn(0, +).  110
  111:- dynamic
  112    current_server/6,         113    queue_worker/2,           114    queue_options/2.          115
  116:- multifile
  117    make_socket_hook/3,
  118    accept_hook/2,
  119    close_hook/1,
  120    open_client_hook/6,
  121    http:create_pool/1,
  122    http:schedule_workers/1.  123
  182
  183http_server(Goal, M:Options0) :-
  184    option(port(Port), Options0),
  185    !,
  186    make_socket(Port, M:Options0, Options),
  187    create_workers(Options),
  188    create_server(Goal, Port, Options),
  189    (   option(silent(true), Options0)
  190    ->  true
  191    ;   print_message(informational,
  192                      httpd_started_server(Port, Options0))
  193    ).
  194http_server(_Goal, _Options) :-
  195    existence_error(option, port).
  196
  197
  205
  206make_socket(Port, Options0, Options) :-
  207    make_socket_hook(Port, Options0, Options),
  208    !.
  209make_socket(Port, _:Options0, Options) :-
  210    option(tcp_socket(_), Options0),
  211    !,
  212    make_addr_atom('httpd', Port, Queue),
  213    Options = [ queue(Queue)
  214              | Options0
  215              ].
  216make_socket(Port, _:Options0, Options) :-
  217    tcp_socket(Socket),
  218    tcp_setopt(Socket, reuseaddr),
  219    tcp_bind(Socket, Port),
  220    tcp_listen(Socket, 5),
  221    make_addr_atom('httpd', Port, Queue),
  222    Options = [ queue(Queue),
  223                tcp_socket(Socket)
  224              | Options0
  225              ].
  226
  231
  232make_addr_atom(Scheme, Address, Atom) :-
  233    phrase(address_parts(Address), Parts),
  234    atomic_list_concat([Scheme,@|Parts], Atom).
  235
  236address_parts(Atomic) -->
  237    { atomic(Atomic) },
  238    !,
  239    [Atomic].
  240address_parts(Host:Port) -->
  241    !,
  242    address_parts(Host), [:], address_parts(Port).
  243address_parts(ip(A,B,C,D)) -->
  244    !,
  245    [ A, '.', B, '.', C, '.', D ].
  246
  251
  252create_server(Goal, Address, Options) :-
  253    get_time(StartTime),
  254    memberchk(queue(Queue), Options),
  255    scheme(Scheme, Options),
  256    address_port(Address, Port),
  257    make_addr_atom(Scheme, Port, Alias),
  258    thread_self(Initiator),
  259    thread_create(accept_server(Goal, Initiator, Options), _,
  260                  [ alias(Alias)
  261                  ]),
  262    thread_get_message(server_started),
  263    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  264
  265scheme(Scheme, Options) :-
  266    option(scheme(Scheme), Options),
  267    !.
  268scheme(Scheme, Options) :-
  269    (   option(ssl(_), Options)
  270    ;   option(ssl_instance(_), Options)
  271    ),
  272    !,
  273    Scheme = https.
  274scheme(http, _).
  275
  276address_port(_Host:Port, Port) :- !.
  277address_port(Port, Port).
  278
  279
  285
  286http_current_server(Goal, Port) :-
  287    current_server(Port, Goal, _, _, _, _).
  288
  289
  302
  303http_server_property(_:Port, Property) :-
  304    integer(Port),
  305    !,
  306    server_property(Property, Port).
  307http_server_property(Port, Property) :-
  308    server_property(Property, Port).
  309
  310server_property(goal(Goal), Port) :-
  311    current_server(Port, Goal, _, _, _, _).
  312server_property(scheme(Scheme), Port) :-
  313    current_server(Port, _, _, _, Scheme, _).
  314server_property(start_time(Time), Port) :-
  315    current_server(Port, _, _, _, _, Time).
  316
  317
  324
  325http_workers(Port, Workers) :-
  326    must_be(ground, Port),
  327    current_server(Port, _, _, Queue, _, _),
  328    !,
  329    (   integer(Workers)
  330    ->  resize_pool(Queue, Workers)
  331    ;   findall(W, queue_worker(Queue, W), WorkerIDs),
  332        length(WorkerIDs, Workers)
  333    ).
  334http_workers(Port, _) :-
  335    existence_error(http_server, Port).
  336
  337
  347
  348http_add_worker(Port, Options) :-
  349    must_be(ground, Port),
  350    current_server(Port, _, _, Queue, _, _),
  351    !,
  352    queue_options(Queue, QueueOptions),
  353    merge_options(Options, QueueOptions, WorkerOptions),
  354    atom_concat(Queue, '_', AliasBase),
  355    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  356http_add_worker(Port, _) :-
  357    existence_error(http_server, Port).
  358
  359
  366
  367http_current_worker(Port, ThreadID) :-
  368    current_server(Port, _, _, Queue, _, _),
  369    queue_worker(Queue, ThreadID).
  370
  371
  376
  377accept_server(Goal, Initiator, Options) :-
  378    catch(accept_server2(Goal, Initiator, Options), http_stop, true),
  379    thread_self(Thread),
  380    retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
  381    close_server_socket(Options).
  382
  383accept_server2(Goal, Initiator, Options) :-
  384    thread_send_message(Initiator, server_started),
  385    repeat,
  386      (   catch(accept_server3(Goal, Options), E, true)
  387      ->  (   var(E)
  388          ->  fail
  389          ;   accept_rethrow_error(E)
  390          ->  throw(E)
  391          ;   print_message(error, E),
  392              fail
  393          )
  394      ;   print_message(error,        395                        goal_failed(accept_server3(Goal, Options))),
  396          fail
  397      ).
  398
  399accept_server3(Goal, Options) :-
  400    accept_hook(Goal, Options),
  401    !.
  402accept_server3(Goal, Options) :-
  403    memberchk(tcp_socket(Socket), Options),
  404    memberchk(queue(Queue), Options),
  405    tcp_accept(Socket, Client, Peer),
  406    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  407    thread_send_message(Queue, tcp_client(Client, Goal, Peer)),
  408    http_enough_workers(Queue, accept, Peer).
  409
  410accept_rethrow_error(http_stop).
  411accept_rethrow_error('$aborted').
  412
  413
  417
  418close_server_socket(Options) :-
  419    close_hook(Options),
  420    !.
  421close_server_socket(Options) :-
  422    memberchk(tcp_socket(Socket), Options),
  423    !,
  424    tcp_close_socket(Socket).
  425
  426
  433
  434http_stop_server(Host:Port, Options) :-           435    ground(Host),
  436    !,
  437    http_stop_server(Port, Options).
  438http_stop_server(Port, _Options) :-
  439    http_workers(Port, 0),                    440    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  441    retractall(queue_options(Queue, _)),
  442    thread_signal(Thread, throw(http_stop)),
  443    catch(connect(localhost:Port), _, true),
  444    thread_join(Thread, _),
  445    message_queue_destroy(Queue).
  446
  447connect(Address) :-
  448    setup_call_cleanup(
  449        tcp_socket(Socket),
  450        tcp_connect(Socket, Address),
  451        tcp_close_socket(Socket)).
  452
  458
  459http_enough_workers(Queue, Why, Peer) :-
  460    message_queue_property(Queue, size(Size)),
  461    (   enough(Size, Why)
  462    ->  true
  463    ;   current_server(Port, _, _, Queue, _, _),
  464        catch(http:schedule_workers(_{port:Port,
  465                                      reason:Why,
  466                                      peer:Peer,
  467                                      waiting:Size,
  468                                      queue:Queue
  469                                     }),
  470              Error,
  471              print_message(error, Error))
  472    ->  true
  473    ;   true
  474    ).
  475
  476enough(0, _).
  477enough(1, keep_alive).                    478
  479
  505
  506
  507                   510
  515
  516create_workers(Options) :-
  517    option(workers(N), Options, 5),
  518    option(queue(Queue), Options),
  519    catch(message_queue_create(Queue), _, true),
  520    atom_concat(Queue, '_', AliasBase),
  521    create_workers(1, N, Queue, AliasBase, Options),
  522    assert(queue_options(Queue, Options)).
  523
  524create_workers(I, N, _, _, _) :-
  525    I > N,
  526    !.
  527create_workers(I, N, Queue, AliasBase, Options) :-
  528    gensym(AliasBase, Alias),
  529    thread_create(http_worker(Options), Id,
  530                  [ alias(Alias)
  531                  | Options
  532                  ]),
  533    assertz(queue_worker(Queue, Id)),
  534    I2 is I + 1,
  535    create_workers(I2, N, Queue, AliasBase, Options).
  536
  537
  542
  543resize_pool(Queue, Size) :-
  544    findall(W, queue_worker(Queue, W), Workers),
  545    length(Workers, Now),
  546    (   Now < Size
  547    ->  queue_options(Queue, Options),
  548        atom_concat(Queue, '_', AliasBase),
  549        I0 is Now+1,
  550        create_workers(I0, Size, Queue, AliasBase, Options)
  551    ;   Now == Size
  552    ->  true
  553    ;   Now > Size
  554    ->  Excess is Now - Size,
  555        thread_self(Me),
  556        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  557        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  558    ).
  559
  560
  568
  569http_worker(Options) :-
  570    thread_at_exit(done_worker),
  571    option(queue(Queue), Options),
  572    option(max_idle_time(MaxIdle), Options, infinite),
  573    repeat,
  574      garbage_collect,
  575      trim_stacks,
  576      debug(http(worker), 'Waiting for a job ...', []),
  577      (   MaxIdle == infinite
  578      ->  thread_get_message(Queue, Message)
  579      ;   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  580      ->  true
  581      ;   Message = quit(idle)
  582      ),
  583      debug(http(worker), 'Got job ~p', [Message]),
  584      (   Message = quit(Sender)
  585      ->  !,
  586          thread_self(Self),
  587          thread_detach(Self),
  588          (   Sender == idle
  589          ->  true
  590          ;   retract(queue_worker(Queue, Self)),
  591              thread_send_message(Sender, quitted(Self))
  592          )
  593      ;   open_client(Message, Queue, Goal, In, Out,
  594                      Options, ClientOptions),
  595          (   catch(http_process(Goal, In, Out, ClientOptions),
  596                    Error, true)
  597          ->  true
  598          ;   Error = goal_failed(http_process/4)
  599          ),
  600          (   var(Error)
  601          ->  fail
  602          ;   current_message_level(Error, Level),
  603              print_message(Level, Error),
  604              memberchk(peer(Peer), ClientOptions),
  605              close_connection(Peer, In, Out),
  606              fail
  607          )
  608      ).
  609
  610
  616
  617open_client(requeue(In, Out, Goal, ClOpts),
  618            _, Goal, In, Out, Opts, ClOpts) :-
  619    !,
  620    memberchk(peer(Peer), ClOpts),
  621    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  622    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  623open_client(Message, Queue, Goal, In, Out, Opts,
  624            [ pool(client(Queue, Goal, In, Out)),
  625              timeout(Timeout)
  626            | Options
  627            ]) :-
  628    catch(open_client(Message, Goal, In, Out, Options, Opts),
  629          E, report_error(E)),
  630    option(timeout(Timeout), Opts, 60),
  631    (   debugging(http(connection))
  632    ->  memberchk(peer(Peer), Options),
  633        debug(http(connection), 'Opened connection from ~p', [Peer])
  634    ;   true
  635    ).
  636
  637
  640
  641open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  642    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  643    !.
  644open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  645            [ peer(Peer),
  646              protocol(http)
  647            ], _) :-
  648    tcp_open_socket(Socket, In, Out).
  649
  650report_error(E) :-
  651    print_message(error, E),
  652    fail.
  653
  654
  660
  661check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  662    stream_property(In, timeout(Old)),
  663    set_stream(In, timeout(TMO)),
  664    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  665    catch(peek_code(In, Code), E, true),
  666    (   var(E),                       667        Code \== -1                   668    ->  set_stream(In, timeout(Old)),
  669        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  670    ;   (   Code == -1
  671        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  672        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  673        ),
  674        close_connection(Peer, In, Out),
  675        fail
  676    ).
  677
  678
  684
  685done_worker :-
  686    thread_self(Self),
  687    thread_detach(Self),
  688    retract(queue_worker(Queue, Self)),
  689    thread_property(Self, status(Status)),
  690    !,
  691    (   catch(recreate_worker(Status, Queue), _, fail)
  692    ->  print_message(informational,
  693                      httpd_restarted_worker(Self))
  694    ;   done_status_message_level(Status, Level),
  695        print_message(Level,
  696                      httpd_stopped_worker(Self, Status))
  697    ).
  698done_worker :-                                    699    thread_self(Self),
  700    thread_property(Self, status(Status)),
  701    done_status_message_level(Status, Level),
  702    print_message(Level,
  703                  httpd_stopped_worker(Self, Status)).
  704
  705done_status_message_level(true, silent) :- !.
  706done_status_message_level(exception('$aborted'), silent) :- !.
  707done_status_message_level(_, informational).
  708
  709
  721
  722recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  723    halt(2).
  724recreate_worker(exception(Error), Queue) :-
  725    recreate_on_error(Error),
  726    queue_options(Queue, Options),
  727    atom_concat(Queue, '_', AliasBase),
  728    create_workers(1, 1, Queue, AliasBase, Options).
  729
  730recreate_on_error('$aborted').
  731recreate_on_error(time_limit_exceeded).
  732
  739
  740:- multifile
  741    message_level/2.  742
  743message_level(error(io_error(read, _), _),      silent).
  744message_level(error(socket_error(epipe,_), _),	silent).
  745message_level(error(timeout_error(read, _), _), informational).
  746message_level(keep_alive_timeout,               silent).
  747
  748current_message_level(Term, Level) :-
  749    (   message_level(Term, Level)
  750    ->  true
  751    ;   Level = error
  752    ).
  753
  754
  759
  760http_requeue(Header) :-
  761    requeue_header(Header, ClientOptions),
  762    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  763    memberchk(peer(Peer), ClientOptions),
  764    http_enough_workers(Queue, keep_alive, Peer),
  765    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  766    !.
  767http_requeue(Header) :-
  768    debug(http(error), 'Re-queue failed: ~p', [Header]),
  769    fail.
  770
([], []).
  772requeue_header([H|T0], [H|T]) :-
  773    requeue_keep(H),
  774    !,
  775    requeue_header(T0, T).
  776requeue_header([_|T0], T) :-
  777    requeue_header(T0, T).
  778
  779requeue_keep(pool(_)).
  780requeue_keep(peer(_)).
  781requeue_keep(protocol(_)).
  782
  783
  787
  788http_process(Goal, In, Out, Options) :-
  789    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  790          [Goal, In, Out]),
  791    option(timeout(TMO), Options, 60),
  792    set_stream(In, timeout(TMO)),
  793    set_stream(Out, timeout(TMO)),
  794    http_wrapper(Goal, In, Out, Connection,
  795                 [ request(Request)
  796                 | Options
  797                 ]),
  798    next(Connection, Request).
  799
  800next(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
  801    !,
  802    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  803    (   catch(call(SwitchGoal, In, Out), E,
  804              (   print_message(error, E),
  805                  fail))
  806    ->  true
  807    ;   http_close_connection(Request)
  808    ).
  809next(spawned(ThreadId), _) :-
  810    !,
  811    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  812next(Connection, Request) :-
  813    downcase_atom(Connection, 'keep-alive'),
  814    http_requeue(Request),
  815    !.
  816next(_, Request) :-
  817    http_close_connection(Request).
  818
  819
  823
  824http_close_connection(Request) :-
  825    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  826    memberchk(peer(Peer), Request),
  827    close_connection(Peer, In, Out).
  828
  833
  834close_connection(Peer, In, Out) :-
  835    debug(http(connection), 'Closing connection from ~p', [Peer]),
  836    catch(close(In, [force(true)]), _, true),
  837    catch(close(Out, [force(true)]), _, true).
  838
  854
  855http_spawn(Goal, Options) :-
  856    select_option(pool(Pool), Options, ThreadOptions),
  857    !,
  858    current_output(CGI),
  859    catch(thread_create_in_pool(Pool,
  860                                wrap_spawned(CGI, Goal), Id,
  861                                [ detached(true)
  862                                | ThreadOptions
  863                                ]),
  864          Error,
  865          true),
  866    (   var(Error)
  867    ->  http_spawned(Id)
  868    ;   Error = error(resource_error(threads_in_pool(_)), _)
  869    ->  throw(http_reply(busy))
  870    ;   Error = error(existence_error(thread_pool, Pool), _),
  871        create_pool(Pool)
  872    ->  http_spawn(Goal, Options)
  873    ;   throw(Error)
  874    ).
  875http_spawn(Goal, Options) :-
  876    current_output(CGI),
  877    thread_create(wrap_spawned(CGI, Goal), Id,
  878                  [ detached(true)
  879                  | Options
  880                  ]),
  881    http_spawned(Id).
  882
  883wrap_spawned(CGI, Goal) :-
  884    set_output(CGI),
  885    http_wrap_spawned(Goal, Request, Connection),
  886    next(Connection, Request).
  887
  895
  896create_pool(Pool) :-
  897    E = error(permission_error(create, thread_pool, Pool), _),
  898    catch(http:create_pool(Pool), E, true).
  899create_pool(Pool) :-
  900    print_message(informational, httpd(created_pool(Pool))),
  901    thread_pool_create(Pool, 10, []).
  902
  903
  904
  905                   908
  909:- multifile
  910    prolog:message/3.  911
  912prolog:message(httpd_started_server(Port, Options)) -->
  913    [ 'Started server at '-[] ],
  914    http_root(Port, Options).
  915prolog:message(httpd_stopped_worker(Self, Status)) -->
  916    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
  917prolog:message(httpd_restarted_worker(Self)) -->
  918    [ 'Replaced aborted worker ~p'-[Self] ].
  919prolog:message(httpd(created_pool(Pool))) -->
  920    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
  921      'Create this pool at startup-time or define the hook ', nl,
  922      'http:create_pool/1 to avoid this message and create a ', nl,
  923      'pool that fits the usage-profile.'
  924    ].
  925
  926http_root(Address, Options) -->
  927    { landing_page(Address, URI, Options) },
  928    [ '~w'-[URI] ].
  929
  930landing_page(Host:Port, URI, Options) :-
  931    must_be(atom, Host),
  932    http_server_property(Port, scheme(Scheme)),
  933    (   default_port(Scheme, Port)
  934    ->  format(atom(Base), '~w://~w', [Scheme, Host])
  935    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
  936    ),
  937    entry_page(Base, URI, Options).
  938landing_page(Port, URI, Options) :-
  939    landing_page(localhost:Port, URI, Options).
  940
  941default_port(http, 80).
  942default_port(https, 443).
  943
  944entry_page(Base, URI, Options) :-
  945    option(entry_page(Entry), Options),
  946    !,
  947    uri_resolve(Entry, Base, URI).
  948entry_page(Base, URI, _) :-
  949    http_absolute_location(root(.), Entry, []),
  950    uri_resolve(Entry, Base, URI)