35
36:- module(http_dyn_workers,
37 [
38 ]). 39:- use_module(library(http/thread_httpd)). 40:- use_module(library(debug)). 41:- use_module(library(settings)). 42:- use_module(library(aggregate)). 43
44:- setting(http:max_workers, integer, 100,
45 "Maximum number of workers to create"). 46:- setting(http:worker_idle_limit, number, 10,
47 "Terminate a dynamic worker when idle for this time"). 48:- setting(http:max_load, number, 10,
49 "Maximum load average caused by HTTP workers").
84:- multifile
85 http:schedule_workers/1. 86
87http:schedule_workers(Dict) :-
88 get_time(Now),
89 catch(thread_send_message('__http_scheduler', no_workers(Now, Dict)),
90 error(existence_error(message_queue, _), _),
91 fail),
92 !.
93http:schedule_workers(Dict) :-
94 create_scheduler,
95 http:schedule_workers(Dict).
96
97create_scheduler :-
98 catch(thread_create(http_scheduler, _,
99 [ alias('__http_scheduler'),
100 inherit_from(main),
101 debug(false),
102 detached(true)
103 ]),
104 error(_,_),
105 fail).
106
107http_scheduler :-
108 get_time(Now),
109 http_scheduler(_{ waiting:0,
110 time:Now
111 }).
112
113http_scheduler(State) :-
114 ( thread_self(Me),
115 thread_get_message(Me, Task, [timeout(10)])
116 -> true
117 ; Task = update_load_avg
118 ),
119 ( catch(reschedule(Task, State, State1),
120 Error,
121 ( print_message(warning, Error),
122 fail))
123 -> !,
124 http_scheduler(State1)
125 ; http_scheduler(State)
126 ).
130reschedule(no_workers(Reported, Dict), State0, State) :-
131 update_load_avg(Dict, State0, State, Load),
132 setting(http:max_load, MaxLoad),
133 ( Load > MaxLoad
134 -> debug(http(scheduler), 'Load ~1f > ~1f; not adding workers',
135 [ Load, MaxLoad ])
136 ; aggregate_all(count, http_current_worker(Dict.port, _), Workers),
137 setting(http:max_workers, MaxWorkers),
138 Wait is 0.001*(MaxWorkers/max(1, MaxWorkers-Workers)),
139 get_time(Now),
140 Sleep is Wait + Reported-Now,
141 debug(http(scheduler), 'Waiting: ~w; active: ~w; sleep: ~3f; load: ~1f',
142 [Dict.waiting, Workers, Sleep, Load]),
143 sleep(Sleep),
144 accept_queue(Dict, Queue),
145 message_queue_property(Queue, size(Newsize)),
146 ( Newsize == 0
147 -> debug(http(scheduler), 'Drained', [])
148 ; debug(http(scheduler), 'Size is ~w: adding worker', [Newsize]),
149 setting(http:worker_idle_limit, MaxIdle),
150 http_add_worker(Dict.port,
151 [ max_idle_time(MaxIdle)
152 ])
153 )
154 ).
155reschedule(update_load_avg, State0, State) :-
156 update_load_avg(_{}, State0, State, _).
157
158update_load_avg(_Dict, State, State, Load) :-
159 _{stamp:Last, load:Load} :< State.get(load),
160 get_time(Now),
161 Now - Last < 10.
162update_load_avg(Dict, State0, State, Load) :-
163 server_port(Dict, State0, State1, Port),
164 !,
165 aggregate_all(sum(CPU), worker_cpu(Port, CPU), CPU1),
166 get_time(Now),
167 ( LoadDict = State1.get(load),
168 _{stamp:Last, cpu:LastCPU} :< LoadDict
169 -> Load0 is (CPU1-LastCPU)/(Now-Last),
170 smooth_load(LoadDict, Load0, Load),
171 State = State1.put(load, _{stamp:Now, cpu:CPU1, load:Load})
172 ; State = State1.put(load, _{stamp:Now, cpu:CPU1}),
173 Load = 0
174 ).
175update_load_avg(_, _, _, 0).
176
177worker_cpu(Port, CPU) :-
178 http_current_worker(Port, Thread),
179 catch(thread_statistics(Thread, cputime, CPU), _, fail).
180
181server_port(_Dict, State, State, Port) :-
182 Port = State.get(port),
183 !.
184server_port(Dict, State0, State, Port) :-
185 Port = Dict.get(port),
186 State = State0.put(port, Port).
187
188smooth_load(LoadDict, Load0, Load) :-
189 OldLoad = LoadDict.get(load),
190 !,
191 Load is (5*OldLoad+Load0)/6.
192smooth_load(_, Load, Load).
199accept_queue(Dict, Queue) :-
200 Queue = Dict.get(queue),
201 !.
202accept_queue(Dict, Queue) :-
203 thread_httpd:current_server(Dict.port, _, _, Queue, _, _),
204 !
Dynamically schedule HTTP workers.
This module defines hooks into the HTTP framework to dynamically schedule worker threads. Dynamic scheduling relieves us from finding a good value for the size of the HTTP worker pool.
The decision to add a worker follows these rules:
The policy depends on three settings: