1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2007-2018, University of Amsterdam 7 VU University Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(thread, 37 [ concurrent/3, % +Threads, :Goals, +Options 38 concurrent_maplist/2, % :Goal, +List 39 concurrent_maplist/3, % :Goal, ?List1, ?List2 40 concurrent_maplist/4, % :Goal, ?List1, ?List2, ?List3 41 first_solution/3 % -Var, :Goals, +Options 42 ]). 43:- use_module(library(debug)). 44:- use_module(library(error)). 45:- use_module(library(lists)). 46:- use_module(library(apply)). 47:- use_module(library(option)). 48 49%:- debug(concurrent). 50 51:- meta_predicate 52 concurrent( , , ), 53 concurrent_maplist( , ), 54 concurrent_maplist( , , ), 55 concurrent_maplist( , , , ), 56 first_solution( , , ). 57 58:- predicate_options(concurrent/3, 3, 59 [ pass_to(system:thread_create/3, 3) 60 ]). 61:- predicate_options(first_solution/3, 3, 62 [ on_fail(oneof([stop,continue])), 63 on_error(oneof([stop,continue])), 64 pass_to(system:thread_create/3, 3) 65 ]). 66 67/** <module> High level thread primitives 68 69This module defines simple to use predicates for running goals 70concurrently. Where the core multi-threaded API is targeted at 71communicating long-living threads, the predicates here are defined to 72run goals concurrently without having to deal with thread creation and 73maintenance explicitely. 74 75Note that these predicates run goals concurrently and therefore these 76goals need to be thread-safe. As the predicates in this module also 77abort branches of the computation that are no longer needed, predicates 78that have side-effect must act properly. In a nutshell, this has the 79following consequences: 80 81 * Nice clean Prolog code without side-effects (but with cut) works 82 fine. 83 * Side-effects are bad news. If you really need assert to store 84 intermediate results, use the thread_local/1 declaration. This 85 also guarantees cleanup of left-over clauses if the thread is 86 cancelled. For other side-effects, make sure to use call_cleanup/2 87 to undo them should the thread be cancelled. 88 * Global variables are ok as they are thread-local and destroyed 89 on thread cancellation. Note however that global variables in 90 the calling thread are *not* available in the threads that are 91 created. You have to pass the value as an argument and initialise 92 the variable in the new thread. 93 * Thread-cancellation uses thread_signal/2. Using this code 94 with long-blocking foreign predicates may result in long delays, 95 even if another thread asks for cancellation. 96 97@author Jan Wielemaker 98*/ 99 100%! concurrent(+N, :Goals, Options) is semidet. 101% 102% Run Goals in parallel using N threads. This call blocks until 103% all work has been done. The Goals must be independent. They 104% should not communicate using shared variables or any form of 105% global data. All Goals must be thread-safe. 106% 107% Execution succeeds if all goals have succeeded. If one goal 108% fails or throws an exception, other workers are abandoned as 109% soon as possible and the entire computation fails or re-throws 110% the exception. Note that if multiple goals fail or raise an 111% error it is not defined which error or failure is reported. 112% 113% On successful completion, variable bindings are returned. Note 114% however that threads have independent stacks and therefore the 115% goal is copied to the worker thread and the result is copied 116% back to the caller of concurrent/3. 117% 118% Choosing the right number of threads is not always obvious. Here 119% are some scenarios: 120% 121% * If the goals are CPU intensive and normally all succeeding, 122% typically the number of CPUs is the optimal number of 123% threads. Less does not use all CPUs, more wastes time in 124% context switches and also uses more memory. 125% 126% * If the tasks are I/O bound the number of threads is 127% typically higher than the number of CPUs. 128% 129% * If one or more of the goals may fail or produce an error, 130% using a higher number of threads may find this earlier. 131% 132% @param N Number of worker-threads to create. Using 1, no threads 133% are created. If N is larger than the number of Goals we 134% create exactly as many threads as there are Goals. 135% @param Goals List of callable terms. 136% @param Options Passed to thread_create/3 for creating the 137% workers. Only options changing the stack-sizes can 138% be used. In particular, do not pass the detached or alias 139% options. 140% @see In many cases, concurrent_maplist/2 and friends 141% is easier to program and is tractable to program 142% analysis. 143 144concurrent(1, M:List, _) :- 145 !, 146 maplist(once_in_module(M), List). 147concurrent(N, M:List, Options) :- 148 must_be(positive_integer, N), 149 must_be(list(callable), List), 150 length(List, JobCount), 151 message_queue_create(Done), 152 message_queue_create(Queue), 153 WorkerCount is min(N, JobCount), 154 create_workers(WorkerCount, Queue, Done, Workers, Options), 155 submit_goals(List, 1, M, Queue, VarList), 156 forall(between(1, WorkerCount, _), 157 thread_send_message(Queue, done)), 158 VT =.. [vars|VarList], 159 concur_wait(JobCount, Done, VT, cleanup(Workers, Queue), 160 Result, [], Exitted), 161 subtract(Workers, Exitted, RemainingWorkers), 162 concur_cleanup(Result, RemainingWorkers, [Queue, Done]), 163 ( Result == true 164 -> true 165 ; Result = false 166 -> fail 167 ; Result = exception(Error) 168 -> throw(Error) 169 ). 170 171once_in_module(M, Goal) :- 172 call(M:Goal), !. 173 174%! submit_goals(+List, +Id0, +Module, +Queue, -Vars) is det. 175% 176% Send all jobs from List to Queue. Each goal is added to Queue as 177% a term goal(Id, Goal, Vars). Vars is unified with a list of 178% lists of free variables appearing in each goal. 179 180submit_goals([], _, _, _, []). 181submit_goals([H|T], I, M, Queue, [Vars|VT]) :- 182 term_variables(H, Vars), 183 thread_send_message(Queue, goal(I, M:H, Vars)), 184 I2 is I + 1, 185 submit_goals(T, I2, M, Queue, VT). 186 187 188%! concur_wait(+N, +Done:queue, +VT:compound, +Cleanup, 189%! -Result, +Exitted0, -Exitted) is semidet. 190% 191% Wait for completion, failure or error. 192% 193% @arg Exited List of thread-ids with threads that completed 194% before all work was done. 195 196concur_wait(0, _, _, _, true, Exited, Exited) :- !. 197concur_wait(N, Done, VT, Cleanup, Status, Exitted0, Exitted) :- 198 debug(concurrent, 'Concurrent: waiting for workers ...', []), 199 catch(thread_get_message(Done, Exit), Error, 200 concur_abort(Error, Cleanup, Done, Exitted0)), 201 debug(concurrent, 'Waiting: received ~p', [Exit]), 202 ( Exit = done(Id, Vars) 203 -> debug(concurrent, 'Concurrent: Job ~p completed with ~p', [Id, Vars]), 204 arg(Id, VT, Vars), 205 N2 is N - 1, 206 concur_wait(N2, Done, VT, Cleanup, Status, Exitted0, Exitted) 207 ; Exit = finished(Thread) 208 -> thread_join(Thread, JoinStatus), 209 debug(concurrent, 'Concurrent: waiter ~p joined: ~p', 210 [Thread, JoinStatus]), 211 ( JoinStatus == true 212 -> concur_wait(N, Done, VT, Cleanup, Status, [Thread|Exitted0], Exitted) 213 ; Status = JoinStatus, 214 Exitted = [Thread|Exitted0] 215 ) 216 ). 217 218concur_abort(Error, cleanup(Workers, Queue), Done, Exitted) :- 219 debug(concurrent, 'Concurrent: got ~p', [Error]), 220 subtract(Workers, Exitted, RemainingWorkers), 221 concur_cleanup(Error, RemainingWorkers, [Queue, Done]), 222 throw(Error). 223 224create_workers(N, Queue, Done, [Id|Ids], Options) :- 225 N > 0, 226 !, 227 thread_create(worker(Queue, Done), Id, 228 [ at_exit(thread_send_message(Done, finished(Id))) 229 | Options 230 ]), 231 N2 is N - 1, 232 create_workers(N2, Queue, Done, Ids, Options). 233create_workers(_, _, _, [], _). 234 235 236%! worker(+WorkQueue, +DoneQueue) is det. 237% 238% Process jobs from WorkQueue and send the results to DoneQueue. 239 240worker(Queue, Done) :- 241 thread_get_message(Queue, Message), 242 debug(concurrent, 'Worker: received ~p', [Message]), 243 ( Message = goal(Id, Goal, Vars) 244 -> ( 245 -> thread_send_message(Done, done(Id, Vars)), 246 worker(Queue, Done) 247 ) 248 ; true 249 ). 250 251 252%! concur_cleanup(+Result, +Workers:list, +Queues:list) is det. 253% 254% Cleanup the concurrent workers and message queues. If Result is 255% not =true=, signal all workers to make them stop prematurely. If 256% result is true we assume all workers have been instructed to 257% stop or have stopped themselves. 258 259concur_cleanup(Result, Workers, Queues) :- 260 !, 261 ( Result == true 262 -> true 263 ; kill_workers(Workers) 264 ), 265 join_all(Workers), 266 maplist(message_queue_destroy, Queues). 267 268kill_workers([]). 269kill_workers([Id|T]) :- 270 debug(concurrent, 'Signalling ~w', [Id]), 271 catch(thread_signal(Id, abort), _, true), 272 kill_workers(T). 273 274join_all([]). 275join_all([Id|T]) :- 276 thread_join(Id, _), 277 join_all(T). 278 279 280 /******************************* 281 * MAPLIST * 282 *******************************/ 283 284%! concurrent_maplist(:Goal, +List) is semidet. 285%! concurrent_maplist(:Goal, +List1, +List2) is semidet. 286%! concurrent_maplist(:Goal, +List1, +List2, +List3) is semidet. 287% 288% Concurrent version of maplist/2. This predicate uses concurrent/3, 289% using multiple _worker_ threads. The number of threads is the 290% minimum of the list length and the number of cores available. The 291% number of cores is determined using the prolog flag =cpu_count=. If 292% this flag is absent or 1 or List has less than two elements, this 293% predicate calls the corresponding maplist/N version using a wrapper 294% based on once/1. Note that all goals are executed as if wrapped in 295% once/1 and therefore these predicates are _semidet_. 296% 297% Note that the the overhead of this predicate is considerable and 298% therefore Goal must be fairly expensive before one reaches a 299% speedup. 300 301concurrent_maplist(Goal, List) :- 302 workers(List, WorkerCount), 303 !, 304 maplist(ml_goal(Goal), List, Goals), 305 concurrent(WorkerCount, Goals, []). 306concurrent_maplist(M:Goal, List) :- 307 maplist(once_in_module(M, Goal), List). 308 309once_in_module(M, Goal, Arg) :- 310 call(M:Goal, Arg), !. 311 312ml_goal(Goal, Elem, call(Goal, Elem)). 313 314concurrent_maplist(Goal, List1, List2) :- 315 same_length(List1, List2), 316 workers(List1, WorkerCount), 317 !, 318 maplist(ml_goal(Goal), List1, List2, Goals), 319 concurrent(WorkerCount, Goals, []). 320concurrent_maplist(M:Goal, List1, List2) :- 321 maplist(once_in_module(M, Goal), List1, List2). 322 323once_in_module(M, Goal, Arg1, Arg2) :- 324 call(M:Goal, Arg1, Arg2), !. 325 326ml_goal(Goal, Elem1, Elem2, call(Goal, Elem1, Elem2)). 327 328concurrent_maplist(Goal, List1, List2, List3) :- 329 same_length(List1, List2, List3), 330 workers(List1, WorkerCount), 331 !, 332 maplist(ml_goal(Goal), List1, List2, List3, Goals), 333 concurrent(WorkerCount, Goals, []). 334concurrent_maplist(M:Goal, List1, List2, List3) :- 335 maplist(once_in_module(M, Goal), List1, List2, List3). 336 337once_in_module(M, Goal, Arg1, Arg2, Arg3) :- 338 call(M:Goal, Arg1, Arg2, Arg3), !. 339 340ml_goal(Goal, Elem1, Elem2, Elem3, call(Goal, Elem1, Elem2, Elem3)). 341 342workers(List, Count) :- 343 current_prolog_flag(cpu_count, Cores), 344 Cores > 1, 345 length(List, Len), 346 Count is min(Cores,Len), 347 Count > 1, 348 !. 349 350same_length([], [], []). 351same_length([_|T1], [_|T2], [_|T3]) :- 352 same_length(T1, T2, T3). 353 354 355 /******************************* 356 * FIRST * 357 *******************************/ 358 359%! first_solution(-X, :Goals, +Options) is semidet. 360% 361% Try alternative solvers concurrently, returning the first 362% answer. In a typical scenario, solving any of the goals in Goals 363% is satisfactory for the application to continue. As soon as one 364% of the tried alternatives is successful, all the others are 365% killed and first_solution/3 succeeds. 366% 367% For example, if it is unclear whether it is better to search a 368% graph breadth-first or depth-first we can use: 369% 370% == 371% search_graph(Grap, Path) :- 372% first_solution(Path, [ breadth_first(Graph, Path), 373% depth_first(Graph, Path) 374% ], 375% []). 376% == 377% 378% Options include thread stack-sizes passed to thread_create, as 379% well as the options =on_fail= and =on_error= that specify what 380% to do if a solver fails or triggers an error. By default 381% execution of all solvers is terminated and the result is 382% returned. Sometimes one may wish to continue. One such scenario 383% is if one of the solvers may run out of resources or one of the 384% solvers is known to be incomplete. 385% 386% * on_fail(Action) 387% If =stop= (default), terminate all threads and stop with 388% the failure. If =continue=, keep waiting. 389% * on_error(Action) 390% As above, re-throwing the error if an error appears. 391% 392% @bug first_solution/3 cannot deal with non-determinism. There 393% is no obvious way to fit non-determinism into it. If multiple 394% solutions are needed wrap the solvers in findall/3. 395 396 397first_solution(X, M:List, Options) :- 398 message_queue_create(Done), 399 thread_options(Options, ThreadOptions, RestOptions), 400 length(List, JobCount), 401 create_solvers(List, M, X, Done, Solvers, ThreadOptions), 402 wait_for_one(JobCount, Done, Result, RestOptions), 403 concur_cleanup(kill, Solvers, [Done]), 404 ( Result = done(_, Var) 405 -> X = Var 406 ; Result = error(_, Error) 407 -> throw(Error) 408 ). 409 410create_solvers([], _, _, _, [], _). 411create_solvers([H|T], M, X, Done, [Id|IDs], Options) :- 412 thread_create(solve(M:H, X, Done), Id, Options), 413 create_solvers(T, M, X, Done, IDs, Options). 414 415solve(Goal, Var, Queue) :- 416 thread_self(Me), 417 ( catch(Goal, E, true) 418 -> ( var(E) 419 -> thread_send_message(Queue, done(Me, Var)) 420 ; thread_send_message(Queue, error(Me, E)) 421 ) 422 ; thread_send_message(Queue, failed(Me)) 423 ). 424 425wait_for_one(0, _, failed, _) :- !. 426wait_for_one(JobCount, Queue, Result, Options) :- 427 thread_get_message(Queue, Msg), 428 LeftCount is JobCount - 1, 429 ( Msg = done(_, _) 430 -> Result = Msg 431 ; Msg = failed(_) 432 -> ( option(on_fail(stop), Options, stop) 433 -> Result = Msg 434 ; wait_for_one(LeftCount, Queue, Result, Options) 435 ) 436 ; Msg = error(_, _) 437 -> ( option(on_error(stop), Options, stop) 438 -> Result = Msg 439 ; wait_for_one(LeftCount, Queue, Result, Options) 440 ) 441 ). 442 443 444%! thread_options(+Options, -ThreadOptions, -RestOptions) is det. 445% 446% Split the option list over thread(-size) options and other 447% options. 448 449thread_options([], [], []). 450thread_options([H|T], [H|Th], O) :- 451 thread_option(H), 452 !, 453 thread_options(T, Th, O). 454thread_options([H|T], Th, [H|O]) :- 455 thread_options(T, Th, O). 456 457thread_option(local(_)). 458thread_option(global(_)). 459thread_option(trail(_)). 460thread_option(argument(_)). 461thread_option(stack(_))