36
37:- module(thread_httpd,
38 [ http_current_server/2, 39 http_server_property/2, 40 http_server/2, 41 http_workers/2, 42 http_add_worker/2, 43 http_current_worker/2, 44 http_stop_server/2, 45 http_spawn/2, 46
47 http_requeue/1, 48 http_close_connection/1, 49 http_enough_workers/3 50 ]). 51:- use_module(library(debug)). 52:- use_module(library(error)). 53:- use_module(library(option)). 54:- use_module(library(socket)). 55:- use_module(library(thread_pool)). 56:- use_module(library(gensym)). 57:- use_module(http_wrapper). 58:- use_module(http_path). 59
60
61:- predicate_options(http_server/2, 2,
62 [ port(any),
63 entry_page(atom),
64 tcp_socket(any),
65 workers(positive_integer),
66 timeout(number),
67 keep_alive_timeout(number),
68 silent(boolean),
69 ssl(list(any)), 70 pass_to(system:thread_create/3, 3)
71 ]). 72:- predicate_options(http_spawn/2, 2,
73 [ pool(atom),
74 pass_to(system:thread_create/3, 3),
75 pass_to(thread_pool:thread_create_in_pool/4, 4)
76 ]). 77:- predicate_options(http_add_worker/2, 2,
78 [ timeout(number),
79 keep_alive_timeout(number),
80 max_idle_time(number),
81 pass_to(system:thread_create/3, 3)
82 ]). 83
105
106:- meta_predicate
107 http_server(1, :),
108 http_current_server(1, ?),
109 http_spawn(0, +). 110
111:- dynamic
112 current_server/6, 113 queue_worker/2, 114 queue_options/2. 115
116:- multifile
117 make_socket_hook/3,
118 accept_hook/2,
119 close_hook/1,
120 open_client_hook/6,
121 http:create_pool/1,
122 http:schedule_workers/1. 123
182
183http_server(Goal, M:Options0) :-
184 option(port(Port), Options0),
185 !,
186 make_socket(Port, M:Options0, Options),
187 create_workers(Options),
188 create_server(Goal, Port, Options),
189 ( option(silent(true), Options0)
190 -> true
191 ; print_message(informational,
192 httpd_started_server(Port, Options0))
193 ).
194http_server(_Goal, _Options) :-
195 existence_error(option, port).
196
197
205
206make_socket(Port, Options0, Options) :-
207 make_socket_hook(Port, Options0, Options),
208 !.
209make_socket(Port, _:Options0, Options) :-
210 option(tcp_socket(_), Options0),
211 !,
212 make_addr_atom('httpd', Port, Queue),
213 Options = [ queue(Queue)
214 | Options0
215 ].
216make_socket(Port, _:Options0, Options) :-
217 tcp_socket(Socket),
218 tcp_setopt(Socket, reuseaddr),
219 tcp_bind(Socket, Port),
220 tcp_listen(Socket, 5),
221 make_addr_atom('httpd', Port, Queue),
222 Options = [ queue(Queue),
223 tcp_socket(Socket)
224 | Options0
225 ].
226
231
232make_addr_atom(Scheme, Address, Atom) :-
233 phrase(address_parts(Address), Parts),
234 atomic_list_concat([Scheme,@|Parts], Atom).
235
236address_parts(Atomic) -->
237 { atomic(Atomic) },
238 !,
239 [Atomic].
240address_parts(Host:Port) -->
241 !,
242 address_parts(Host), [:], address_parts(Port).
243address_parts(ip(A,B,C,D)) -->
244 !,
245 [ A, '.', B, '.', C, '.', D ].
246
251
252create_server(Goal, Address, Options) :-
253 get_time(StartTime),
254 memberchk(queue(Queue), Options),
255 scheme(Scheme, Options),
256 address_port(Address, Port),
257 make_addr_atom(Scheme, Port, Alias),
258 thread_self(Initiator),
259 thread_create(accept_server(Goal, Initiator, Options), _,
260 [ alias(Alias)
261 ]),
262 thread_get_message(server_started),
263 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
264
265scheme(Scheme, Options) :-
266 option(scheme(Scheme), Options),
267 !.
268scheme(Scheme, Options) :-
269 ( option(ssl(_), Options)
270 ; option(ssl_instance(_), Options)
271 ),
272 !,
273 Scheme = https.
274scheme(http, _).
275
276address_port(_Host:Port, Port) :- !.
277address_port(Port, Port).
278
279
285
286http_current_server(Goal, Port) :-
287 current_server(Port, Goal, _, _, _, _).
288
289
302
303http_server_property(_:Port, Property) :-
304 integer(Port),
305 !,
306 server_property(Property, Port).
307http_server_property(Port, Property) :-
308 server_property(Property, Port).
309
310server_property(goal(Goal), Port) :-
311 current_server(Port, Goal, _, _, _, _).
312server_property(scheme(Scheme), Port) :-
313 current_server(Port, _, _, _, Scheme, _).
314server_property(start_time(Time), Port) :-
315 current_server(Port, _, _, _, _, Time).
316
317
324
325http_workers(Port, Workers) :-
326 must_be(ground, Port),
327 current_server(Port, _, _, Queue, _, _),
328 !,
329 ( integer(Workers)
330 -> resize_pool(Queue, Workers)
331 ; findall(W, queue_worker(Queue, W), WorkerIDs),
332 length(WorkerIDs, Workers)
333 ).
334http_workers(Port, _) :-
335 existence_error(http_server, Port).
336
337
347
348http_add_worker(Port, Options) :-
349 must_be(ground, Port),
350 current_server(Port, _, _, Queue, _, _),
351 !,
352 queue_options(Queue, QueueOptions),
353 merge_options(Options, QueueOptions, WorkerOptions),
354 atom_concat(Queue, '_', AliasBase),
355 create_workers(1, 1, Queue, AliasBase, WorkerOptions).
356http_add_worker(Port, _) :-
357 existence_error(http_server, Port).
358
359
366
367http_current_worker(Port, ThreadID) :-
368 current_server(Port, _, _, Queue, _, _),
369 queue_worker(Queue, ThreadID).
370
371
376
377accept_server(Goal, Initiator, Options) :-
378 catch(accept_server2(Goal, Initiator, Options), http_stop, true),
379 thread_self(Thread),
380 retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
381 close_server_socket(Options).
382
383accept_server2(Goal, Initiator, Options) :-
384 thread_send_message(Initiator, server_started),
385 repeat,
386 ( catch(accept_server3(Goal, Options), E, true)
387 -> ( var(E)
388 -> fail
389 ; accept_rethrow_error(E)
390 -> throw(E)
391 ; print_message(error, E),
392 fail
393 )
394 ; print_message(error, 395 goal_failed(accept_server3(Goal, Options))),
396 fail
397 ).
398
399accept_server3(Goal, Options) :-
400 accept_hook(Goal, Options),
401 !.
402accept_server3(Goal, Options) :-
403 memberchk(tcp_socket(Socket), Options),
404 memberchk(queue(Queue), Options),
405 tcp_accept(Socket, Client, Peer),
406 debug(http(connection), 'New HTTP connection from ~p', [Peer]),
407 thread_send_message(Queue, tcp_client(Client, Goal, Peer)),
408 http_enough_workers(Queue, accept, Peer).
409
410accept_rethrow_error(http_stop).
411accept_rethrow_error('$aborted').
412
413
417
418close_server_socket(Options) :-
419 close_hook(Options),
420 !.
421close_server_socket(Options) :-
422 memberchk(tcp_socket(Socket), Options),
423 !,
424 tcp_close_socket(Socket).
425
426
433
434http_stop_server(Host:Port, Options) :- 435 ground(Host),
436 !,
437 http_stop_server(Port, Options).
438http_stop_server(Port, _Options) :-
439 http_workers(Port, 0), 440 current_server(Port, _, Thread, Queue, _Scheme, _Start),
441 retractall(queue_options(Queue, _)),
442 thread_signal(Thread, throw(http_stop)),
443 catch(connect(localhost:Port), _, true),
444 thread_join(Thread, _),
445 message_queue_destroy(Queue).
446
447connect(Address) :-
448 setup_call_cleanup(
449 tcp_socket(Socket),
450 tcp_connect(Socket, Address),
451 tcp_close_socket(Socket)).
452
458
459http_enough_workers(Queue, Why, Peer) :-
460 message_queue_property(Queue, size(Size)),
461 ( enough(Size, Why)
462 -> true
463 ; current_server(Port, _, _, Queue, _, _),
464 catch(http:schedule_workers(_{port:Port,
465 reason:Why,
466 peer:Peer,
467 waiting:Size,
468 queue:Queue
469 }),
470 Error,
471 print_message(error, Error))
472 -> true
473 ; true
474 ).
475
476enough(0, _).
477enough(1, keep_alive). 478
479
505
506
507 510
515
516create_workers(Options) :-
517 option(workers(N), Options, 5),
518 option(queue(Queue), Options),
519 catch(message_queue_create(Queue), _, true),
520 atom_concat(Queue, '_', AliasBase),
521 create_workers(1, N, Queue, AliasBase, Options),
522 assert(queue_options(Queue, Options)).
523
524create_workers(I, N, _, _, _) :-
525 I > N,
526 !.
527create_workers(I, N, Queue, AliasBase, Options) :-
528 gensym(AliasBase, Alias),
529 thread_create(http_worker(Options), Id,
530 [ alias(Alias)
531 | Options
532 ]),
533 assertz(queue_worker(Queue, Id)),
534 I2 is I + 1,
535 create_workers(I2, N, Queue, AliasBase, Options).
536
537
542
543resize_pool(Queue, Size) :-
544 findall(W, queue_worker(Queue, W), Workers),
545 length(Workers, Now),
546 ( Now < Size
547 -> queue_options(Queue, Options),
548 atom_concat(Queue, '_', AliasBase),
549 I0 is Now+1,
550 create_workers(I0, Size, Queue, AliasBase, Options)
551 ; Now == Size
552 -> true
553 ; Now > Size
554 -> Excess is Now - Size,
555 thread_self(Me),
556 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
557 forall(between(1, Excess, _), thread_get_message(quitted(_)))
558 ).
559
560
568
569http_worker(Options) :-
570 thread_at_exit(done_worker),
571 option(queue(Queue), Options),
572 option(max_idle_time(MaxIdle), Options, infinite),
573 repeat,
574 garbage_collect,
575 trim_stacks,
576 debug(http(worker), 'Waiting for a job ...', []),
577 ( MaxIdle == infinite
578 -> thread_get_message(Queue, Message)
579 ; thread_get_message(Queue, Message, [timeout(MaxIdle)])
580 -> true
581 ; Message = quit(idle)
582 ),
583 debug(http(worker), 'Got job ~p', [Message]),
584 ( Message = quit(Sender)
585 -> !,
586 thread_self(Self),
587 thread_detach(Self),
588 ( Sender == idle
589 -> true
590 ; retract(queue_worker(Queue, Self)),
591 thread_send_message(Sender, quitted(Self))
592 )
593 ; open_client(Message, Queue, Goal, In, Out,
594 Options, ClientOptions),
595 ( catch(http_process(Goal, In, Out, ClientOptions),
596 Error, true)
597 -> true
598 ; Error = goal_failed(http_process/4)
599 ),
600 ( var(Error)
601 -> fail
602 ; current_message_level(Error, Level),
603 print_message(Level, Error),
604 memberchk(peer(Peer), ClientOptions),
605 close_connection(Peer, In, Out),
606 fail
607 )
608 ).
609
610
616
617open_client(requeue(In, Out, Goal, ClOpts),
618 _, Goal, In, Out, Opts, ClOpts) :-
619 !,
620 memberchk(peer(Peer), ClOpts),
621 option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
622 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
623open_client(Message, Queue, Goal, In, Out, Opts,
624 [ pool(client(Queue, Goal, In, Out)),
625 timeout(Timeout)
626 | Options
627 ]) :-
628 catch(open_client(Message, Goal, In, Out, Options, Opts),
629 E, report_error(E)),
630 option(timeout(Timeout), Opts, 60),
631 ( debugging(http(connection))
632 -> memberchk(peer(Peer), Options),
633 debug(http(connection), 'Opened connection from ~p', [Peer])
634 ; true
635 ).
636
637
640
641open_client(Message, Goal, In, Out, ClientOptions, Options) :-
642 open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
643 !.
644open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
645 [ peer(Peer),
646 protocol(http)
647 ], _) :-
648 tcp_open_socket(Socket, In, Out).
649
650report_error(E) :-
651 print_message(error, E),
652 fail.
653
654
660
661check_keep_alive_connection(In, TMO, Peer, In, Out) :-
662 stream_property(In, timeout(Old)),
663 set_stream(In, timeout(TMO)),
664 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
665 catch(peek_code(In, Code), E, true),
666 ( var(E), 667 Code \== -1 668 -> set_stream(In, timeout(Old)),
669 debug(http(keep_alive), '\tre-using keep-alive connection', [])
670 ; ( Code == -1
671 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
672 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
673 ),
674 close_connection(Peer, In, Out),
675 fail
676 ).
677
678
684
685done_worker :-
686 thread_self(Self),
687 thread_detach(Self),
688 retract(queue_worker(Queue, Self)),
689 thread_property(Self, status(Status)),
690 !,
691 ( catch(recreate_worker(Status, Queue), _, fail)
692 -> print_message(informational,
693 httpd_restarted_worker(Self))
694 ; done_status_message_level(Status, Level),
695 print_message(Level,
696 httpd_stopped_worker(Self, Status))
697 ).
698done_worker :- 699 thread_self(Self),
700 thread_property(Self, status(Status)),
701 done_status_message_level(Status, Level),
702 print_message(Level,
703 httpd_stopped_worker(Self, Status)).
704
705done_status_message_level(true, silent) :- !.
706done_status_message_level(exception('$aborted'), silent) :- !.
707done_status_message_level(_, informational).
708
709
721
722recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
723 halt(2).
724recreate_worker(exception(Error), Queue) :-
725 recreate_on_error(Error),
726 queue_options(Queue, Options),
727 atom_concat(Queue, '_', AliasBase),
728 create_workers(1, 1, Queue, AliasBase, Options).
729
730recreate_on_error('$aborted').
731recreate_on_error(time_limit_exceeded).
732
739
740:- multifile
741 message_level/2. 742
743message_level(error(io_error(read, _), _), silent).
744message_level(error(socket_error(epipe,_), _), silent).
745message_level(error(timeout_error(read, _), _), informational).
746message_level(keep_alive_timeout, silent).
747
748current_message_level(Term, Level) :-
749 ( message_level(Term, Level)
750 -> true
751 ; Level = error
752 ).
753
754
759
760http_requeue(Header) :-
761 requeue_header(Header, ClientOptions),
762 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
763 memberchk(peer(Peer), ClientOptions),
764 http_enough_workers(Queue, keep_alive, Peer),
765 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
766 !.
767http_requeue(Header) :-
768 debug(http(error), 'Re-queue failed: ~p', [Header]),
769 fail.
770
([], []).
772requeue_header([H|T0], [H|T]) :-
773 requeue_keep(H),
774 !,
775 requeue_header(T0, T).
776requeue_header([_|T0], T) :-
777 requeue_header(T0, T).
778
779requeue_keep(pool(_)).
780requeue_keep(peer(_)).
781requeue_keep(protocol(_)).
782
783
787
788http_process(Goal, In, Out, Options) :-
789 debug(http(server), 'Running server goal ~p on ~p -> ~p',
790 [Goal, In, Out]),
791 option(timeout(TMO), Options, 60),
792 set_stream(In, timeout(TMO)),
793 set_stream(Out, timeout(TMO)),
794 http_wrapper(Goal, In, Out, Connection,
795 [ request(Request)
796 | Options
797 ]),
798 next(Connection, Request).
799
800next(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
801 !,
802 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
803 ( catch(call(SwitchGoal, In, Out), E,
804 ( print_message(error, E),
805 fail))
806 -> true
807 ; http_close_connection(Request)
808 ).
809next(spawned(ThreadId), _) :-
810 !,
811 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
812next(Connection, Request) :-
813 downcase_atom(Connection, 'keep-alive'),
814 http_requeue(Request),
815 !.
816next(_, Request) :-
817 http_close_connection(Request).
818
819
823
824http_close_connection(Request) :-
825 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
826 memberchk(peer(Peer), Request),
827 close_connection(Peer, In, Out).
828
833
834close_connection(Peer, In, Out) :-
835 debug(http(connection), 'Closing connection from ~p', [Peer]),
836 catch(close(In, [force(true)]), _, true),
837 catch(close(Out, [force(true)]), _, true).
838
854
855http_spawn(Goal, Options) :-
856 select_option(pool(Pool), Options, ThreadOptions),
857 !,
858 current_output(CGI),
859 catch(thread_create_in_pool(Pool,
860 wrap_spawned(CGI, Goal), Id,
861 [ detached(true)
862 | ThreadOptions
863 ]),
864 Error,
865 true),
866 ( var(Error)
867 -> http_spawned(Id)
868 ; Error = error(resource_error(threads_in_pool(_)), _)
869 -> throw(http_reply(busy))
870 ; Error = error(existence_error(thread_pool, Pool), _),
871 create_pool(Pool)
872 -> http_spawn(Goal, Options)
873 ; throw(Error)
874 ).
875http_spawn(Goal, Options) :-
876 current_output(CGI),
877 thread_create(wrap_spawned(CGI, Goal), Id,
878 [ detached(true)
879 | Options
880 ]),
881 http_spawned(Id).
882
883wrap_spawned(CGI, Goal) :-
884 set_output(CGI),
885 http_wrap_spawned(Goal, Request, Connection),
886 next(Connection, Request).
887
895
896create_pool(Pool) :-
897 E = error(permission_error(create, thread_pool, Pool), _),
898 catch(http:create_pool(Pool), E, true).
899create_pool(Pool) :-
900 print_message(informational, httpd(created_pool(Pool))),
901 thread_pool_create(Pool, 10, []).
902
903
904
905 908
909:- multifile
910 prolog:message/3. 911
912prolog:message(httpd_started_server(Port, Options)) -->
913 [ 'Started server at '-[] ],
914 http_root(Port, Options).
915prolog:message(httpd_stopped_worker(Self, Status)) -->
916 [ 'Stopped worker ~p: ~p'-[Self, Status] ].
917prolog:message(httpd_restarted_worker(Self)) -->
918 [ 'Replaced aborted worker ~p'-[Self] ].
919prolog:message(httpd(created_pool(Pool))) -->
920 [ 'Created thread-pool ~p of size 10'-[Pool], nl,
921 'Create this pool at startup-time or define the hook ', nl,
922 'http:create_pool/1 to avoid this message and create a ', nl,
923 'pool that fits the usage-profile.'
924 ].
925
926http_root(Address, Options) -->
927 { landing_page(Address, URI, Options) },
928 [ '~w'-[URI] ].
929
930landing_page(Host:Port, URI, Options) :-
931 must_be(atom, Host),
932 http_server_property(Port, scheme(Scheme)),
933 ( default_port(Scheme, Port)
934 -> format(atom(Base), '~w://~w', [Scheme, Host])
935 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
936 ),
937 entry_page(Base, URI, Options).
938landing_page(Port, URI, Options) :-
939 landing_page(localhost:Port, URI, Options).
940
941default_port(http, 80).
942default_port(https, 443).
943
944entry_page(Base, URI, Options) :-
945 option(entry_page(Entry), Options),
946 !,
947 uri_resolve(Entry, Base, URI).
948entry_page(Base, URI, _) :-
949 http_absolute_location(root(.), Entry, []),
950 uri_resolve(Entry, Base, URI)