35
36:- module(http_dyn_workers,
37 [
38 ]). 39:- use_module(library(http/thread_httpd)). 40:- use_module(library(debug)). 41:- use_module(library(settings)). 42:- use_module(library(aggregate)). 43
44:- setting(http:max_workers, integer, 100,
45 "Maximum number of workers to create"). 46:- setting(http:worker_idle_limit, number, 10,
47 "Terminate a dynamic worker when idle for this time"). 48:- setting(http:max_load, number, 10,
49 "Maximum load average caused by HTTP workers").
72:- multifile
73 http:schedule_workers/1. 74
75http:schedule_workers(Dict) :-
76 get_time(Now),
77 catch(thread_send_message('__http_scheduler', no_workers(Now, Dict)),
78 error(existence_error(message_queue, _), _),
79 fail),
80 !.
81http:schedule_workers(Dict) :-
82 create_scheduler,
83 http:schedule_workers(Dict).
84
85create_scheduler :-
86 catch(thread_create(http_scheduler, _,
87 [ alias('__http_scheduler'),
88 inherit_from(main),
89 debug(false),
90 detached(true)
91 ]),
92 error(_,_),
93 fail).
94
95http_scheduler :-
96 get_time(Now),
97 http_scheduler(_{ waiting:0,
98 time:Now
99 }).
100
101http_scheduler(State) :-
102 ( thread_self(Me),
103 thread_get_message(Me, Task, [timeout(10)])
104 -> true
105 ; Task = update_load_avg
106 ),
107 ( catch(reschedule(Task, State, State1),
108 Error,
109 ( print_message(warning, Error),
110 fail))
111 -> !,
112 http_scheduler(State1)
113 ; http_scheduler(State)
114 ).
118reschedule(no_workers(Reported, Dict), State0, State) :-
119 update_load_avg(Dict, State0, State, Load),
120 setting(http:max_load, MaxLoad),
121 ( Load > MaxLoad
122 -> debug(http(scheduler), 'Load ~1f > ~1f; not adding workers',
123 [ Load, MaxLoad ])
124 ; aggregate_all(count, http_current_worker(Dict.port, _), Workers),
125 setting(http:max_workers, MaxWorkers),
126 Wait is 0.001*(MaxWorkers/max(1, MaxWorkers-Workers)),
127 get_time(Now),
128 Sleep is Wait + Reported-Now,
129 debug(http(scheduler), 'Waiting: ~w; active: ~w; sleep: ~3f; load: ~1f',
130 [Dict.waiting, Workers, Sleep, Load]),
131 sleep(Sleep),
132 accept_queue(Dict, Queue),
133 message_queue_property(Queue, size(Newsize)),
134 ( Newsize == 0
135 -> debug(http(scheduler), 'Drained', [])
136 ; debug(http(scheduler), 'Size is ~w: adding worker', [Newsize]),
137 setting(http:worker_idle_limit, MaxIdle),
138 http_add_worker(Dict.port,
139 [ max_idle_time(MaxIdle)
140 ])
141 )
142 ).
143reschedule(update_load_avg, State0, State) :-
144 update_load_avg(_{}, State0, State, _).
145
146update_load_avg(_Dict, State, State, Load) :-
147 _{stamp:Last, load:Load} :< State.get(load),
148 get_time(Now),
149 Now - Last < 10.
150update_load_avg(Dict, State0, State, Load) :-
151 server_port(Dict, State0, State1, Port),
152 !,
153 aggregate_all(sum(CPU), worker_cpu(Port, CPU), CPU1),
154 get_time(Now),
155 ( LoadDict = State1.get(load),
156 _{stamp:Last, cpu:LastCPU} :< LoadDict
157 -> Load0 is (CPU1-LastCPU)/(Now-Last),
158 smooth_load(LoadDict, Load0, Load),
159 State = State1.put(load, _{stamp:Now, cpu:CPU1, load:Load})
160 ; State = State1.put(load, _{stamp:Now, cpu:CPU1}),
161 Load = 0
162 ).
163update_load_avg(_, _, _, 0).
164
165worker_cpu(Port, CPU) :-
166 http_current_worker(Port, Thread),
167 catch(thread_statistics(Thread, cputime, CPU), _, fail).
168
169server_port(_Dict, State, State, Port) :-
170 Port = State.get(port),
171 !.
172server_port(Dict, State0, State, Port) :-
173 Port = Dict.get(port),
174 State = State0.put(port, Port).
175
176smooth_load(LoadDict, Load0, Load) :-
177 OldLoad = LoadDict.get(load),
178 !,
179 Load is (5*OldLoad+Load0)/6.
180smooth_load(_, Load, Load).
187accept_queue(Dict, Queue) :-
188 Queue = Dict.get(queue),
189 !.
190accept_queue(Dict, Queue) :-
191 thread_httpd:current_server(Dict.port, _, _, Queue, _, _),
192 !
Dynamically schedule HTTP workers.
This module defines hooks into the HTTP framework to dynamically schedule worker threads. Dynamic scheduling relieves us from finding a good value for the size of the HTTP worker pool.
The decision to add a worker follows these rules:
*/