Line data Source code
1 : /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 : Copyright (c) 2017-2023 The plumed team
3 : (see the PEOPLE file at the root of the distribution for a list of names)
4 :
5 : See http://www.plumed.org for more information.
6 :
7 : This file is part of plumed, version 2.
8 :
9 : plumed is free software: you can redistribute it and/or modify
10 : it under the terms of the GNU Lesser General Public License as published by
11 : the Free Software Foundation, either version 3 of the License, or
12 : (at your option) any later version.
13 :
14 : plumed is distributed in the hope that it will be useful,
15 : but WITHOUT ANY WARRANTY; without even the implied warranty of
16 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 : GNU Lesser General Public License for more details.
18 :
19 : You should have received a copy of the GNU Lesser General Public License
20 : along with plumed. If not, see <http://www.gnu.org/licenses/>.
21 : +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
22 : #ifndef __PLUMED_core_ParallelTaskManager_h
23 : #define __PLUMED_core_ParallelTaskManager_h
24 :
25 : #include "ActionWithVector.h"
26 : #include "ActionWithMatrix.h"
27 : #include "tools/Communicator.h"
28 : #include "tools/OpenMP.h"
29 : #include "tools/View.h"
30 : #include "tools/View2D.h"
31 :
32 : #include "tools/ColvarOutput.h"
33 : #include "tools/OpenACC.h"
34 :
35 : namespace PLMD {
36 :
37 : struct ArgumentsBookkeeping {
38 : std::size_t nargs{0};
39 : std::vector<std::size_t> ranks;
40 : std::vector<std::size_t> shapestarts;
41 : std::vector<std::size_t> shapedata;
42 : std::vector<std::size_t> ncols;
43 : std::vector<std::size_t> bookstarts;
44 : std::vector<std::size_t> booksizes;
45 : std::vector<std::size_t> bookeeping;
46 : std::vector<std::size_t> argstarts;
47 : void setupArguments( const ActionWithArguments* action );
48 : };
49 :
50 : inline
51 212807 : void ArgumentsBookkeeping::setupArguments( const ActionWithArguments* action ) {
52 212807 : nargs = action->getNumberOfArguments();
53 212807 : ranks.resize( nargs );
54 212807 : shapestarts.resize( nargs );
55 212807 : argstarts.resize( nargs );
56 : std::size_t s = 0;
57 : std::size_t ts = 0;
58 498831 : for(unsigned i=0; i<nargs; ++i) {
59 : Value* myarg = action->getPntrToArgument(i);
60 286024 : shapestarts[i] = ts;
61 286024 : ranks[i] = myarg->getRank();
62 286024 : ts += ranks[i];
63 286024 : argstarts[i] = s;
64 286024 : s += myarg->getNumberOfStoredValues();
65 : }
66 212807 : shapedata.resize( ts );
67 : ts = 0;
68 212807 : ncols.resize( nargs );
69 212807 : bookstarts.resize( nargs );
70 212807 : booksizes.resize( nargs );
71 : std::size_t nbook = 0;
72 498831 : for(unsigned i=0; i<nargs; ++i) {
73 : Value* myarg = action->getPntrToArgument(i);
74 549813 : for(unsigned j=0; j<ranks[i]; ++j) {
75 263789 : shapedata[ts] = myarg->getShape()[j];
76 263789 : ++ts;
77 : }
78 286024 : bookstarts[i] = nbook;
79 286024 : if( ranks[i]==1 ) {
80 197265 : ncols[i] = 1;
81 197265 : booksizes[i] = 2*myarg->getShape()[0];
82 88759 : } else if( ranks[i]==2 ) {
83 33097 : ncols[i] = myarg->getNumberOfColumns();
84 33097 : booksizes[i] = myarg->matrix_bookeeping.size();
85 : }
86 286024 : nbook += booksizes[i];
87 : }
88 212807 : bookeeping.resize( nbook );
89 : ts = 0;
90 498831 : for(unsigned i=0; i<nargs; ++i) {
91 : Value* myarg = action->getPntrToArgument(i);
92 286024 : if( ranks[i]==1 ) {
93 31484454 : for(unsigned j=0; j<myarg->getShape()[0]; ++j) {
94 31287189 : bookeeping[ts] = 1;
95 31287189 : bookeeping[ts+1] = 0;
96 31287189 : ts += 2;
97 : }
98 88759 : } else if( ranks[i]==2 ) {
99 1157638470 : for(unsigned j=0; j<myarg->matrix_bookeeping.size(); ++j) {
100 1157605373 : bookeeping[ts] = myarg->matrix_bookeeping[j];
101 1157605373 : ++ts;
102 : }
103 : }
104 : }
105 212807 : }
106 :
107 : struct ParallelActionsInput {
108 : /// Do we need to calculate the derivatives
109 : bool noderiv{false};
110 : /// Periodic boundary conditions
111 : const Pbc* pbc;
112 : /// The number of components the underlying action is computing
113 : std::size_t ncomponents{0};
114 : /// The number of scalars we are calculating for each task
115 : unsigned nscalars{0};
116 : /// The number of force scalars for each task
117 : unsigned nforcescalars{0};
118 : /// Number of derivatives for each scalar being calculated
119 : std::size_t nderivatives_per_scalar{0};
120 : /// The start of the thread unsafe forces
121 : unsigned threadunsafe_forces_start{0};
122 : /// This holds all the input data that is required to calculate all values for all tasks
123 : unsigned dataSize{0};
124 : double *inputdata{nullptr};
125 : /// Bookeeping stuff for arguments
126 : std::size_t nargs{0};
127 : std::size_t ranks_size{0};
128 : const std::size_t* ranks{nullptr};
129 : std::size_t shapestarts_size{0};
130 : const std::size_t* shapestarts{nullptr};
131 : std::size_t shapedata_size{0};
132 : const std::size_t* shapedata{nullptr};
133 : std::size_t ncols_size{0};
134 : const std::size_t* ncols{nullptr};
135 : std::size_t bookstarts_size{0};
136 : const std::size_t* bookstarts{nullptr};
137 : std::size_t booksizes_size{0};
138 : const std::size_t* booksizes{nullptr};
139 : std::size_t bookeeping_size{0};
140 : const std::size_t* bookeeping{nullptr};
141 : std::size_t argstarts_size{0};
142 : const std::size_t* argstarts{nullptr};
143 : static ParallelActionsInput create( const Pbc& box ) {
144 3468 : auto toret=ParallelActionsInput();
145 3415 : toret.pbc=&box;
146 : return toret;
147 : }
148 :
149 : //helper function to bring data to the device in a controlled way
150 : void toACCDevice()const {
151 : #pragma acc enter data copyin(this[0:1], noderiv, pbc[0:1],ncomponents, \
152 : nscalars, nderivatives_per_scalar, threadunsafe_forces_start, \
153 : dataSize, inputdata[0:dataSize])
154 : if (nargs>0) {
155 : #pragma acc enter data copyin( nargs, \
156 : ranks_size, ranks[0:ranks_size], \
157 : shapestarts_size, shapestarts[0:shapestarts_size], \
158 : shapedata_size, shapedata[0:shapedata_size], \
159 : ncols_size, ncols[0:ncols_size], \
160 : bookstarts_size, bookstarts[0:bookstarts_size], \
161 : booksizes_size, booksizes[0:booksizes_size], \
162 : bookeeping_size, bookeeping[0:bookeeping_size], \
163 : argstarts_size, argstarts[0:argstarts_size] \
164 : )
165 : }
166 : pbc->toACCDevice();
167 : }
168 : //helper function to remove data from the device in a controlled way
169 : void removeFromACCDevice() const {
170 : pbc->removeFromACCDevice();
171 : // assuming dataSize is not changed
172 : if (nargs>0) {
173 : #pragma acc exit data delete( \
174 : shapestarts[0:shapestarts_size], shapestarts_size, \
175 : shapedata[0:shapedata_size], shapedata_size, \
176 : ncols[0:ncols_size], ncols_size, \
177 : bookstarts[0:bookstarts_size], bookstarts_size, \
178 : booksizes[0:booksizes_size], booksizes_size, \
179 : bookeeping[0:bookeeping_size], bookeeping_size, \
180 : argstarts[0:argstarts_size], argstarts_size, \
181 : ranks[0:ranks_size], ranks_size, \
182 : nargs )
183 : }
184 :
185 : #pragma acc exit data delete( \
186 : inputdata[0:dataSize], dataSize, \
187 : threadunsafe_forces_start, nderivatives_per_scalar, \
188 : nscalars, ncomponents, pbc[0:1],noderiv,this[0:1])
189 : }
190 : /// Setup the arguments
191 : void setupArguments( const ArgumentsBookkeeping& ab );
192 : unsigned sizeOfFakeVals() const {
193 : return nscalars;
194 : }
195 : };
196 :
197 212807 : inline void ParallelActionsInput::setupArguments( const ArgumentsBookkeeping& ab ) {
198 212807 : nargs = ab.nargs;
199 212807 : ranks = ab.ranks.data();
200 212807 : ranks_size = ab.ranks.size();
201 212807 : shapestarts = ab.shapestarts.data();
202 212807 : shapestarts_size = ab.shapestarts.size();
203 212807 : shapedata = ab.shapedata.data();
204 212807 : shapedata_size = ab.shapedata.size();
205 212807 : ncols = ab.ncols.data();
206 212807 : ncols_size = ab.ncols.size();
207 212807 : bookstarts = ab.bookstarts.data();
208 212807 : bookstarts_size = ab.bookstarts.size();
209 212807 : booksizes = ab.booksizes.data();
210 212807 : booksizes_size = ab.booksizes.size();
211 212807 : bookeeping = ab.bookeeping.data();
212 212807 : bookeeping_size = ab.bookeeping.size();
213 212807 : argstarts = ab.argstarts.data();
214 212807 : argstarts_size = ab.argstarts.size();
215 212807 : }
216 :
217 : struct ArgumentBookeepingHolder {
218 : std::size_t rank;
219 : std::size_t ncols;
220 : std::size_t start;
221 : View<const std::size_t> shape;
222 : View<const std::size_t> bookeeping;
223 :
224 785272 : static ArgumentBookeepingHolder create ( std::size_t argno, const ParallelActionsInput& inp ) {
225 : return ArgumentBookeepingHolder{
226 785272 : inp.ranks[argno], // rank
227 785272 : inp.ncols[argno], // ncols
228 785272 : inp.argstarts[argno], // start
229 785272 : View<const std::size_t>(inp.shapedata + inp.shapestarts[argno], inp.ranks[argno] ), // shape
230 785272 : View<const std::size_t>(inp.bookeeping + inp.bookstarts[argno], inp.booksizes[argno] ) // bookeeping
231 785272 : };
232 : }
233 : };
234 :
235 : struct ParallelActionsOutput {
236 : View<double> values;
237 : View<double> derivatives;
238 : View<double> buffer;
239 :
240 : static ParallelActionsOutput create( std::size_t ncomp, double* v, std::size_t ndev, double* d, std::size_t nb, double* b ) {
241 : return ParallelActionsOutput{
242 : View{v,ncomp}, //values
243 : View{d,ndev}, // derivatives
244 : View{b,nb} // buffer
245 : };
246 : }
247 : };
248 :
249 : struct ForceIndexHolder {
250 : View<std::size_t> threadsafe_derivatives_end;
251 : View<std::size_t> tot_indices;
252 : View2D<std::size_t> indices;
253 :
254 : /// Constructs a ForceIndexHolder object.
255 : /// \param nc Definition here (number of components?)
256 : /// \param nd Definition here (number of derivatives?)
257 : /// \param ind Pointer to an array storing index data. It should have a size of at least 2*nc + nc*nd.
258 : static ForceIndexHolder create(const std::size_t nc,
259 : const std::size_t nd,
260 : std::size_t* ind ) {
261 : return ForceIndexHolder{
262 : View{ind,nc}, // threadsafe_derivatives_end
263 : View{ind+nc,nc}, // tot_indices
264 : View2D{ind+2*nc,nc,nd} // indices
265 : };
266 : }
267 : static ForceIndexHolder create(const ParallelActionsInput& inp,
268 : std::size_t* ind ) {
269 : return create(inp.ncomponents,
270 : inp.nderivatives_per_scalar,ind);
271 : }
272 :
273 : /// \brief Returns the number of indexes needed by a ForceIndexHolder
274 : /// for each scalar.
275 : ///
276 : /// \param nc Definition here (number of components?)
277 : /// \param nd Definition here (number of derivatives?)
278 : ///
279 : /// \return the number of indexes needed by a ForceIndexHolder
280 : /// for each scalar.
281 : static size_t indexesPerScalar(const std::size_t nc, const std::size_t nd) {
282 : return nc // threadsafe_derivatives_end
283 : + nc // tot_indices
284 : + nc*nd; // indices
285 : }
286 : static size_t indexesPerScalar(const ParallelActionsInput& inp) {
287 : return indexesPerScalar(inp.ncomponents,
288 : inp.nderivatives_per_scalar);
289 : }
290 : };
291 :
292 : class ForceInput {
293 : public:
294 : View<double> force;
295 : View2D<double> deriv;
296 : static ForceInput create( std::size_t nv, double* f, std::size_t nd, double* d ) {
297 : return ForceInput{
298 : View{f,nv}, //force
299 : View2D{d,nv,nd} //deriv
300 : };
301 : }
302 : };
303 :
304 : //There is no need to pass this as reference:
305 : struct ForceOutput {
306 : //I would suggest to invert the name or to be clearer
307 : // like something that recalls that "thread_safe" will be need to be reducted (hence it is NOT thread safe)
308 : View<double> thread_safe;
309 : //these are the forces that we promise will not provoke races
310 : View<double> thread_unsafe;
311 : //const T* is a ptr to const T
312 : //T* const is a conts ptr to a modifiable T
313 : static ForceOutput create(std::vector<double>& reduced, std::vector<double>& notReduced) {
314 : return ForceOutput{
315 : View{reduced.data(),reduced.size()}, // thread_safe
316 : View{notReduced.data(),notReduced.size()} // thread_unsafe
317 : };
318 : }
319 : static ForceOutput create(double* reduced, size_t rs, double* notReduce, size_t nrsz) {
320 : return ForceOutput{
321 : View{reduced,rs}, // thread_safe
322 : View{notReduce,nrsz} // thread_unsafe
323 : };
324 : }
325 : };
326 :
327 : // namespace PTMUtils {
328 : // template<class, class = void>
329 : // constexpr bool has_gatherForces_custom = false;
330 : //
331 : // //this verifies that T has a method gatherForces_custom that can be called with this signature
332 : // template<class T>
333 : // constexpr bool has_gatherForces_custom <
334 : // T,
335 : // std::void_t<
336 : // decltype(T::gatherForces_custom(
337 : // std::declval<unsigned >(),
338 : // std::declval<size_t >(),
339 : // std::declval<size_t >(),
340 : // std::declval<const typename T::input_type & >(),
341 : // std::declval<const ParallelActionsInput& >(),
342 : // std::declval<View<unsigned> >(),
343 : // std::declval<double *>(),
344 : // std::declval<double *>(),
345 : // std::declval<View<double> >()
346 : // ))
347 : // >
348 : // > = true;
349 : //
350 : // template<class, class = void>
351 : // constexpr bool has_gatherForces_GPU = false;
352 : //
353 : // //this verifies that T has a method gatherForces_custom that can be called with this signature
354 : // template<class T>
355 : // constexpr bool has_gatherForces_GPU <
356 : // T,
357 : // std::void_t<
358 : // decltype(T::gatherForcesGPU(
359 : // std::declval<unsigned >(),
360 : // std::declval<const typename T::input_type & >(),
361 : // std::declval<const ParallelActionsInput& >(),
362 : // std::declval<const ForceInput& >(),
363 : // std::declval<ForceOutput >()
364 : // ))
365 : // >
366 : // > = true;
367 : //
368 : // /// If the template class has virialSize, otherwise is 9
369 : // template<class, class=void>
370 : // constexpr size_t virialSize = 9;
371 : //
372 : // template<class T>
373 : // constexpr size_t virialSize<T, std::void_t<decltype(T::virialSize),
374 : // //this ensures that T::virialSize is a static member
375 : // std::enable_if_t<!std::is_member_pointer_v<decltype(&T::virialSize)>>
376 : // >>
377 : // = T::virialSize;
378 : // } //namespace PTMUtils
379 :
380 : template <class T>
381 : class ParallelTaskManager {
382 : public:
383 : using input_type= typename T::input_type;
384 : // static constexpr bool has_custom_gather=PTMUtils::has_gatherForces_custom<T>;
385 : // static constexpr bool has_GPU_gather=PTMUtils::has_gatherForces_GPU<T>;
386 : // static constexpr size_t virialSize = PTMUtils::virialSize<T>;
387 : private:
388 : /// The underlying action for which we are managing parallel tasks
389 : ActionWithVector* action;
390 : /// The MPI communicator
391 : Communicator& comm;
392 : /// Is this an action with matrix
393 : bool ismatrix;
394 : /// True if not using MPI for parllisation
395 : bool serial;
396 : /// Are we using acc for parallisation
397 : bool useacc;
398 : /// Number of derivatives calculated for each task
399 : std::size_t nderivatives_per_task;
400 : /// The number of forces on each thread
401 : /// The number of forces on each thread
402 : std::size_t nthreaded_forces;
403 : /// This holds the values before we pass them to the value
404 : std::vector<double> value_stash;
405 : /// A tempory set of vectors for holding forces over threads
406 : std::vector<std::vector<double> > omp_forces;
407 : /// This structs is used to pass data between the parallel interface and the function caller
408 : ParallelActionsInput myinput;
409 : ArgumentsBookkeeping argumentsMap;
410 : //this holds the data for myinput that will be passed though myinput
411 : std::vector<double> input_buffer;
412 : /// This holds tempory data that we use in performTask
413 : std::size_t workspace_size;
414 : /// This holds data for that the underlying action needs to do the calculation
415 : input_type actiondata;
416 : //// This is used internally to get the number of elements in the value stash
417 : std::size_t getValueStashSize() const ;
418 : /// This is used internally to gather the forces on the threads
419 : void gatherThreads( ForceOutput forces );
420 : public:
421 : static void registerKeywords( Keywords& keys );
422 : ParallelTaskManager(ActionWithVector* av);
423 : /// Setup the parallel task manager the three arguments are
424 : /// nder = number of derivatives per scalar
425 : /// nforce_ts = number of forces that are modified by multiple tasks
426 : void setupParallelTaskManager( std::size_t nder, std::size_t nforce_ts );
427 : /// Copy the data from the underlying colvar into this parallel action
428 : void setActionInput( const input_type& adata );
429 : /// Creating the size of the workspace
430 : void setWorkspaceSize( std::size_t size );
431 : /// Get the action input so we can use it
432 : input_type& getActionInput();
433 : const input_type& getActionInput() const ;
434 : /// Is the calculation running in serial
435 : bool runInSerial() const {
436 9828 : return serial;
437 : }
438 : /// This runs all the tasks
439 : void runAllTasks();
440 : /// Apply the forces on the parallel object
441 : void applyForces( std::vector<double>& forcesForApply );
442 : /// This is used to gather forces that are thread safe
443 : static void gatherThreadSafeForces( const ParallelActionsInput& input,
444 : const ForceIndexHolder& force_indices,
445 : const ForceInput& fdata,
446 : View<double> forces );
447 : /// This is used to gather forces that are not thread safe
448 : static void gatherThreadUnsafeForces( const ParallelActionsInput& input,
449 : const ForceIndexHolder& force_indices,
450 : const ForceInput& fdata,
451 : View<double> forces );
452 : };
453 :
454 : template <class T>
455 8043 : void ParallelTaskManager<T>::registerKeywords( Keywords& keys ) {
456 8043 : keys.addFlag("SERIAL",false,"do the calculation in serial. Do not parallelize");
457 8043 : keys.addFlag("USEGPU",false,"run this calculation on the GPU");
458 16086 : keys.addLinkInDocForFlag("USEGPU","gpu.md");
459 16086 : keys.addLinkInDocForFlag("SERIAL", "actions.md");
460 8043 : }
461 :
462 : template <class T>
463 3249 : ParallelTaskManager<T>::ParallelTaskManager(ActionWithVector* av):
464 3249 : action(av),
465 3249 : comm(av->comm),
466 3249 : ismatrix(false),
467 3249 : useacc(false),
468 3249 : nderivatives_per_task(0),
469 3249 : nthreaded_forces(0),
470 : myinput(ParallelActionsInput::create(av->getPbc())),
471 3249 : workspace_size(0) {
472 3249 : ActionWithMatrix* am=dynamic_cast<ActionWithMatrix*>(av);
473 3249 : if(am) {
474 428 : ismatrix=true;
475 : }
476 3249 : action->parseFlag("USEGPU",useacc);
477 : #ifdef __PLUMED_USE_OPENACC
478 : if( useacc ) {
479 : action->log.printf(" using GPU to calculate this action\n");
480 : }
481 : #else
482 3249 : if( useacc ) {
483 0 : action->error("cannot use USEGPU flag as PLUMED has not been compiled with openacc");
484 : }
485 : #endif
486 3249 : action->parseFlag("SERIAL",serial);
487 3249 : if( serial ) {
488 0 : action->log.printf(" not using MPI to parallelise this action\n");
489 : }
490 3249 : }
491 :
492 : template <class T>
493 246853 : std::size_t ParallelTaskManager<T>::getValueStashSize() const {
494 : std::size_t valuesize=0;
495 528657 : for(unsigned i=0; i<action->getNumberOfComponents(); ++i) {
496 281804 : const Value* mycomp = action->getConstPntrToComponent(i);
497 281804 : if( mycomp->hasDerivatives() ) {
498 9196 : valuesize += mycomp->getNumberOfStoredValues()*(1+action->getNumberOfDerivatives());
499 : } else {
500 272608 : valuesize += mycomp->getNumberOfStoredValues();
501 : }
502 : }
503 246853 : return valuesize;
504 : }
505 :
506 :
507 : template <class T>
508 34265 : void ParallelTaskManager<T>::setupParallelTaskManager( std::size_t nder,
509 : std::size_t nforce_ts ) {
510 34265 : plumed_massert( action->getNumberOfComponents()>0, "there should be some components wen you setup the index list" );
511 34265 : myinput.ncomponents = action->getNumberOfComponents();
512 34265 : unsigned ntasks=0;
513 34265 : action->getNumberOfTasks( ntasks );
514 34265 : myinput.nscalars = 0;
515 34265 : myinput.nforcescalars = 0;
516 75419 : for(unsigned i=0; i<myinput.ncomponents; ++i) {
517 41154 : if( (action->copyOutput(i))->hasDerivatives() ) {
518 2631 : myinput.nscalars += 1 + action->getNumberOfDerivatives();
519 2631 : myinput.nforcescalars += 1;
520 38523 : } else if( (action->copyOutput(i))->getRank()==1 ) {
521 18677 : myinput.nscalars += 1;
522 18677 : myinput.nforcescalars += 1;
523 19846 : } else if( (action->copyOutput(i))->getRank()==2 ) {
524 19846 : if( ntasks==(action->copyOutput(i))->getShape()[0] ) {
525 13398 : myinput.nscalars += (action->copyOutput(i))->getNumberOfColumns();
526 13398 : myinput.nforcescalars += (action->copyOutput(i))->getNumberOfColumns();
527 : } else {
528 6448 : myinput.nscalars += 1;
529 6448 : myinput.nforcescalars += 1;
530 : }
531 : }
532 : }
533 34265 : myinput.nderivatives_per_scalar = nder;
534 34265 : nderivatives_per_task = nder*myinput.nforcescalars;
535 34265 : value_stash.resize( getValueStashSize() );
536 34265 : myinput.threadunsafe_forces_start = action->getNumberOfForceDerivatives() - nforce_ts;
537 34265 : unsigned t=OpenMP::getNumThreads();
538 34265 : if( useacc ) {
539 : t = 1;
540 : }
541 34265 : omp_forces.resize(t);
542 102795 : for(unsigned i=0; i<t; ++i) {
543 68530 : omp_forces[i].resize(nforce_ts);
544 : }
545 34265 : }
546 :
547 : template <class T>
548 1193 : void ParallelTaskManager<T>::setActionInput( const input_type& adata ) {
549 748 : actiondata=adata;
550 1193 : }
551 :
552 : template <class T>
553 401242 : typename ParallelTaskManager<T>::input_type& ParallelTaskManager<T>::getActionInput() {
554 401242 : return actiondata;
555 : }
556 :
557 : template <class T>
558 811779 : const typename ParallelTaskManager<T>::input_type& ParallelTaskManager<T>::getActionInput() const {
559 811779 : return actiondata;
560 : }
561 :
562 : template <class T>
563 12459 : void ParallelTaskManager<T>::setWorkspaceSize( std::size_t size ) {
564 12459 : workspace_size = size;
565 12459 : }
566 :
567 : #ifdef __PLUMED_USE_OPENACC
568 : //use the __PLUMED_USE_OPENACC_TASKSMINE macro to debug the ptm ins a single file
569 : //so that compiling witha a small modification will be faster (the ptm is included nearly everywhere)
570 : #ifndef __PLUMED_USE_OPENACC_TASKSMINE
571 : template <class T>
572 : void runAllTasksACC(typename T::input_type actiondata,
573 : ParallelActionsInput myinput,
574 : std::vector<double>& value_stash,
575 : const std::vector<unsigned> & partialTaskList,
576 : const unsigned nactive_tasks,
577 : const std::size_t nderivatives_per_task,
578 : const std::size_t workspace_size
579 : ) {
580 : auto myinput_acc = OpenACC::fromToDataHelper(myinput);
581 : auto actiondata_acc = OpenACC::fromToDataHelper(actiondata);
582 :
583 : //template type is deduced
584 : OpenACC::memoryManager vs{value_stash};
585 : auto value_stash_data = vs.devicePtr();
586 :
587 : OpenACC::memoryManager ptl{partialTaskList};
588 : auto partialTaskList_data = ptl.devicePtr();
589 :
590 : OpenACC::memoryManager<double> buff{workspace_size*nactive_tasks};
591 :
592 : auto buffer = buff.devicePtr();
593 : OpenACC::memoryManager<double> dev(nderivatives_per_task*nactive_tasks);
594 : auto derivatives = dev.devicePtr();
595 : #pragma acc parallel loop present(myinput, actiondata) \
596 : copyin(nactive_tasks, \
597 : nderivatives_per_task, \
598 : workspace_size)\
599 : deviceptr(derivatives, \
600 : partialTaskList_data, \
601 : value_stash_data, \
602 : buffer) \
603 : default(none)
604 : for(unsigned i=0; i<nactive_tasks; ++i) {
605 : std::size_t task_index = partialTaskList_data[i];
606 : std::size_t val_pos = task_index*myinput.nscalars;
607 : auto myout = ParallelActionsOutput::create (myinput.nscalars,
608 : value_stash_data+val_pos,
609 : nderivatives_per_task,
610 : derivatives+nderivatives_per_task*i,
611 : workspace_size,
612 : (workspace_size>0)?
613 : buffer+workspace_size*i
614 : :nullptr );
615 : // Calculate the stuff in the loop for this action
616 : T::performTask( task_index, actiondata, myinput, myout );
617 : }
618 : vs.copyFromDevice(value_stash.data());
619 : }
620 : #else
621 : template <class T>
622 : void runAllTasksACC(typename T::input_type actiondata,
623 : ParallelActionsInput myinput,
624 : std::vector<double>& value_stash,
625 : const std::vector<unsigned> & partialTaskList,
626 : const unsigned nactive_tasks,
627 : const std::size_t nderivatives_per_task,
628 : const std::size_t workspace_size
629 : ) ;
630 : #endif //__PLUMED_USE_OPENACC_TASKSMINE
631 : #endif //__PLUMED_USE_OPENACC
632 :
633 : template <class T>
634 212588 : void ParallelTaskManager<T>::runAllTasks() {
635 : // Get the list of active tasks
636 212588 : std::vector<unsigned> & partialTaskList( action->getListOfActiveTasks( action ) );
637 212588 : unsigned nactive_tasks=partialTaskList.size();
638 : // Get all the input data so we can broadcast it to the GPU
639 212588 : myinput.noderiv = true;
640 446673 : for(unsigned i=0; i<action->getNumberOfComponents(); ++i) {
641 240650 : if( (action->getConstPntrToComponent(i))->hasDerivatives() ) {
642 6565 : myinput.noderiv=false;
643 6565 : break;
644 : }
645 : }
646 212588 : action->getInputData( input_buffer );
647 212588 : myinput.dataSize = input_buffer.size();
648 212588 : myinput.inputdata = input_buffer.data();
649 : // Transfer all the bookeeping information about the arguments
650 212588 : argumentsMap.setupArguments( action );
651 212588 : myinput.setupArguments( argumentsMap );
652 : // Reset the values at the start of the task loop
653 212588 : std::size_t totalvals=getValueStashSize();
654 212588 : if( value_stash.size()!=totalvals ) {
655 62 : value_stash.resize(totalvals);
656 : }
657 : std::fill (value_stash.begin(),value_stash.end(), 0.0);
658 212588 : if( useacc ) {
659 : #ifdef __PLUMED_USE_OPENACC
660 : if (comm.Get_rank()== 0) {// no multigpu shenanigans until this works
661 : runAllTasksACC<T>(
662 : actiondata,
663 : myinput,
664 : value_stash,
665 : partialTaskList,
666 : nactive_tasks,
667 : nderivatives_per_task,
668 : workspace_size
669 : );
670 : }
671 : comm.Bcast( value_stash.data(), value_stash.size(), 0);
672 : #else
673 0 : plumed_merror("cannot use USEGPU flag if PLUMED has not been compiled with openACC");
674 : #endif
675 : } else {
676 : // Get the MPI details
677 212588 : unsigned stride=comm.Get_size();
678 212588 : unsigned rank=comm.Get_rank();
679 212588 : if(serial) {
680 : stride=1;
681 : rank=0;
682 : }
683 :
684 : // Get number of threads for OpenMP
685 212588 : unsigned nt=OpenMP::getNumThreads();
686 212588 : if( nt*stride*10>nactive_tasks ) {
687 128031 : nt=nactive_tasks/stride/10;
688 : }
689 : if( nt==0 ) {
690 : nt=1;
691 : }
692 :
693 212588 : #pragma omp parallel num_threads(nt)
694 : {
695 : std::vector<double> buffer( workspace_size );
696 : std::vector<double> derivatives( nderivatives_per_task );
697 : #pragma omp for nowait
698 : for(unsigned i=rank; i<nactive_tasks; i+=stride) {
699 : std::size_t task_index = partialTaskList[i];
700 : std::size_t val_pos = task_index*myinput.nscalars;
701 : auto myout = ParallelActionsOutput::create ( myinput.nscalars,
702 : value_stash.data()+val_pos,
703 : nderivatives_per_task,
704 : derivatives.data(),
705 : workspace_size,
706 : buffer.data() );
707 : // Calculate the stuff in the loop for this action
708 : T::performTask( task_index, actiondata, myinput, myout );
709 : }
710 : }
711 : // MPI Gather everything
712 212588 : if( !serial ) {
713 212588 : comm.Sum( value_stash );
714 : }
715 : }
716 : // And transfer the value to the output values
717 212588 : action->transferStashToValues( partialTaskList, value_stash );
718 212588 : }
719 :
720 : #ifdef __PLUMED_USE_OPENACC
721 : //use the __PLUMED_USE_OPENACC_FORCESMINE macro to debug the ptm ins a single file
722 : //so that compiling witha a small modification will be faster (the ptm is included nearly everywhere)
723 : #ifndef __PLUMED_USE_OPENACC_FORCESMINE
724 : template <class T>
725 : void applyForcesWithACC(PLMD::View<double> forcesForApply,
726 : typename T::input_type actiondata,
727 : ParallelActionsInput myinput,
728 : const std::vector<double>& value_stash,
729 : const std::vector<unsigned> & partialTaskList,
730 : const unsigned nactive_tasks,
731 : const std::size_t nderivatives_per_task,
732 : const std::size_t workspace_size
733 : ) {
734 : auto myinput_acc = OpenACC::fromToDataHelper(myinput);
735 : auto actiondata_acc = OpenACC::fromToDataHelper(actiondata);
736 :
737 : //template type is deduced
738 : OpenACC::memoryManager vs{value_stash};
739 : auto value_stash_data = vs.devicePtr();
740 :
741 : OpenACC::memoryManager ptl{partialTaskList};
742 : auto partialTaskList_data = ptl.devicePtr();
743 :
744 : OpenACC::memoryManager ffa {forcesForApply};
745 : auto forcesForApply_data = ffa.devicePtr();
746 : const auto forcesForApply_size = ffa.size();
747 : const auto nind_per_scalar = ForceIndexHolder::indexesPerScalar(myinput);
748 : //nscalars is >=ncomponents (see setupParallelTaskManager )
749 : const auto nind_per_task = nind_per_scalar*myinput.nscalars;
750 :
751 : OpenACC::memoryManager<double> dev{nderivatives_per_task*nactive_tasks};
752 : auto derivatives = dev.devicePtr();
753 : OpenACC::memoryManager<std::size_t> ind{nind_per_task*nactive_tasks};
754 : auto indices = ind.devicePtr();
755 : OpenACC::memoryManager<double> vtmp{myinput.sizeOfFakeVals()*nactive_tasks};
756 : auto valstmp = vtmp.devicePtr();
757 : OpenACC::memoryManager<double> buff{workspace_size*nactive_tasks};
758 : auto buffer = buff.devicePtr();
759 :
760 : #define forces_indicesArg(taskID,scalarID) ForceIndexHolder::create(myinput, \
761 : indices + taskID*nind_per_task + scalarID*nind_per_scalar)
762 : #define derivativeDrift(taskID,scalarID) taskID*nderivatives_per_task \
763 : + scalarID*myinput.ncomponents*myinput.nderivatives_per_scalar
764 : #define stashDrift(taskID,scalarID) taskID*myinput.nscalars \
765 : + scalarID*myinput.ncomponents
766 :
767 : #pragma acc data present(myinput,actiondata) \
768 : copyin(nactive_tasks, \
769 : forcesForApply_size, \
770 : nderivatives_per_task, nind_per_task,nind_per_scalar, \
771 : workspace_size) \
772 : deviceptr(derivatives, \
773 : indices, \
774 : value_stash_data, \
775 : partialTaskList_data, \
776 : forcesForApply_data, \
777 : valstmp, \
778 : buffer) \
779 : default(none)
780 : {
781 : #pragma acc parallel loop
782 : for(unsigned t=0; t<nactive_tasks; ++t) {
783 : std::size_t task_index = partialTaskList_data[t];
784 : auto myout = ParallelActionsOutput::create( myinput.nscalars,
785 : valstmp+myinput.nscalars*t,
786 : nderivatives_per_task,
787 : derivatives+nderivatives_per_task*t,
788 : workspace_size,
789 : (workspace_size>0)?buffer+workspace_size*t:nullptr);
790 : // Calculate the stuff in the loop for this action
791 : T::performTask( task_index, actiondata, myinput, myout );
792 : // If this is a matrix this returns a number that isn't one as we have to loop over the columns
793 : const std::size_t nvpt = T::getNumberOfValuesPerTask( task_index, actiondata );
794 : #pragma acc loop seq
795 : for(unsigned vID=0; vID<nvpt; ++vID) {
796 : auto force_indices = forces_indicesArg(t,vID);
797 : // Create a force index holder
798 : // Get the indices for forces
799 : T::getForceIndices( task_index,
800 : vID,
801 : forcesForApply_size,
802 : actiondata,
803 : myinput,
804 : force_indices );
805 :
806 : // Create a force input object
807 : auto finput = ForceInput::create ( myinput.nscalars,
808 : value_stash_data + stashDrift(task_index,vID),
809 : myinput.nderivatives_per_scalar,
810 : derivatives + derivativeDrift(t,vID));
811 :
812 : // Gather forces that can be gathered locally
813 : ParallelTaskManager<T>::gatherThreadSafeForces( myinput,
814 : force_indices,
815 : finput,
816 : View<double>(forcesForApply_data,
817 : forcesForApply_size));
818 : }
819 : }
820 :
821 : #pragma acc parallel loop
822 : for(unsigned v=myinput.threadunsafe_forces_start; v<forcesForApply_size; ++v) {
823 : double tmp = 0.0;
824 : #pragma acc loop reduction(+:tmp)
825 : for(unsigned t=0; t<nactive_tasks; ++t) {
826 : const std::size_t task_index = partialTaskList_data[t];
827 : const std::size_t nvpt = T::getNumberOfValuesPerTask( task_index, actiondata );
828 : for(unsigned vID=0; vID<nvpt; ++vID) {
829 : auto force_indices = forces_indicesArg(t,vID);
830 :
831 : auto fdata = ForceInput::create( myinput.nscalars,
832 : value_stash_data + stashDrift(task_index,vID),
833 : myinput.nderivatives_per_scalar,
834 : derivatives + derivativeDrift(t,vID));
835 : for(unsigned i=0; i<myinput.ncomponents; ++i) {
836 : const double ff = fdata.force[i];
837 : for(unsigned d=force_indices.threadsafe_derivatives_end[i];
838 : d<force_indices.tot_indices[i]; ++d) {
839 : if( force_indices.indices[i][d]==v ) {
840 : tmp += ff*fdata.deriv[i][d];
841 : // break;
842 : }
843 : }
844 : }
845 : }
846 : }
847 : forcesForApply_data[v] = tmp;
848 : }
849 : }
850 : #undef forces_indicesArg
851 : #undef derivativeDrift
852 : #undef stashDrift
853 : ffa.copyFromDevice(forcesForApply.data());
854 : }
855 : #else
856 : template <class T>
857 : void applyForcesWithACC(PLMD::View<double> forcesForApply,
858 : typename T::input_type actiondata,
859 : ParallelActionsInput myinput,
860 : const std::vector<double>& value_stash,
861 : const std::vector<unsigned> & partialTaskList,
862 : const unsigned nactive_tasks,
863 : const std::size_t nderivatives_per_task,
864 : const std::size_t workspace_size
865 : );
866 : #endif //__PLUMED_USE_OPENACC_FORCESMINE
867 : #endif //__PLUMED_USE_OPENACC
868 : template <class T>
869 181507 : void ParallelTaskManager<T>::applyForces( std::vector<double>& forcesForApply ) {
870 : // Get the list of active tasks
871 181507 : std::vector<unsigned> & partialTaskList= action->getListOfActiveTasks( action ) ;
872 181507 : unsigned nactive_tasks=partialTaskList.size();
873 : // Clear force buffer
874 181507 : forcesForApply.assign( forcesForApply.size(), 0.0 );
875 : //TODO: check if std::fill is faster (i get conflicting answers on the net)
876 : //std::fill (forcesForApply.begin(),forcesForApply.end(), 0.0);
877 : // Get all the input data so we can broadcast it to the GPU
878 181507 : myinput.noderiv = false;
879 : // Retrieve the forces from the values
880 181507 : action->transferForcesToStash( partialTaskList, value_stash );
881 :
882 181507 : if( useacc ) {
883 : #ifdef __PLUMED_USE_OPENACC
884 : std::fill (omp_forces[0].begin(),omp_forces[0].end(), 0.0);
885 : if (comm.Get_rank() == 0) {
886 : applyForcesWithACC<T>(
887 : PLMD::View<double> { forcesForApply.data(), forcesForApply.size() },
888 : actiondata,
889 : myinput,
890 : value_stash,
891 : partialTaskList,
892 : nactive_tasks,
893 : nderivatives_per_task,
894 : workspace_size
895 : );
896 : }
897 : #else
898 0 : plumed_merror("cannot use USEGPU flag if PLUMED has not been compiled with openACC");
899 : #endif
900 : } else {
901 : // Get the MPI details
902 181507 : unsigned stride=comm.Get_size();
903 181507 : unsigned rank=comm.Get_rank();
904 181507 : if(serial) {
905 : stride=1;
906 : rank=0;
907 : }
908 :
909 : // Get number of threads for OpenMP
910 181507 : unsigned nt=OpenMP::getNumThreads();
911 181507 : if( nt*stride*10>nactive_tasks ) {
912 112207 : nt=nactive_tasks/stride/10;
913 : }
914 : if( nt==0 ) {
915 : nt=1;
916 : }
917 181507 : #pragma omp parallel num_threads(nt)
918 : {
919 : const unsigned t=OpenMP::getThreadNum();
920 : omp_forces[t].assign( omp_forces[t].size(), 0.0 );
921 : std::vector<double> buffer( workspace_size );
922 : std::vector<double> fake_vals( myinput.sizeOfFakeVals() );
923 : std::vector<double> derivatives( nderivatives_per_task );
924 : std::vector<std::size_t> indices(ForceIndexHolder::indexesPerScalar(myinput));
925 :
926 : auto force_indices = ForceIndexHolder::create( myinput,indices.data() );
927 : #pragma omp for nowait
928 : for(unsigned i=rank; i<nactive_tasks; i+=stride) {
929 : std::size_t task_index = partialTaskList[i];
930 : auto myout = ParallelActionsOutput::create( myinput.nscalars,
931 : fake_vals.data(),
932 : derivatives.size(),
933 : derivatives.data(),
934 : workspace_size,
935 : buffer.data() );
936 : // Calculate the stuff in the loop for this action
937 : T::performTask( task_index, actiondata, myinput, myout );
938 :
939 : // If this is a matrix this returns a number that isn't one as we have to loop over the columns
940 : const std::size_t nvpt = T::getNumberOfValuesPerTask( task_index, actiondata );
941 : for(unsigned j=0; j<nvpt; ++j) {
942 : // Get the force indices
943 : T::getForceIndices( task_index,
944 : j,
945 : forcesForApply.size(),
946 : actiondata,
947 : myinput,
948 : force_indices );
949 : // Create a force input object
950 : auto finput=ForceInput::create( myinput.nforcescalars,
951 : value_stash.data()
952 : + myinput.nforcescalars*task_index
953 : + j*myinput.ncomponents,
954 : myinput.nderivatives_per_scalar,
955 : derivatives.data()
956 : + j*myinput.ncomponents*myinput.nderivatives_per_scalar );
957 :
958 : // Gather forces that are thread safe
959 : gatherThreadSafeForces( myinput,
960 : force_indices,
961 : finput,
962 : View<double>(forcesForApply.data(),
963 : forcesForApply.size()) );
964 :
965 : // Gather forces that are not thread safe
966 : gatherThreadUnsafeForces( myinput,
967 : force_indices,
968 : finput,
969 : View<double>(omp_forces[t].data(),
970 : omp_forces[t].size()) );
971 : }
972 : }
973 :
974 : #pragma omp critical
975 : gatherThreads( ForceOutput::create(omp_forces[t], forcesForApply ) );
976 : }
977 : // MPI Gather everything (this must be extended to the gpu thing, after makning it mpi-aware)
978 181507 : if( !serial ) {
979 181507 : comm.Sum( forcesForApply );
980 : }
981 : }
982 181507 : }
983 :
984 : template <class T>
985 29091134 : void ParallelTaskManager<T>::gatherThreadSafeForces( const ParallelActionsInput& input,
986 : const ForceIndexHolder& force_indices,
987 : const ForceInput& fdata,
988 : View<double> forces ) {
989 129551003 : for(unsigned i=0; i<input.ncomponents; ++i) {
990 100459869 : double ff = fdata.force[i];
991 438317830 : for(unsigned j=0; j<force_indices.threadsafe_derivatives_end[i]; ++j) {
992 337857961 : forces[ force_indices.indices[i][j] ] += ff*fdata.deriv[i][j];
993 : }
994 : }
995 29091134 : }
996 :
997 : template <class T>
998 29091134 : void ParallelTaskManager<T>::gatherThreadUnsafeForces(const ParallelActionsInput& input,
999 : const ForceIndexHolder& force_indices,
1000 : const ForceInput& fdata,
1001 : View<double> forces ) {
1002 129551003 : for(unsigned i=0; i<input.ncomponents; ++i) {
1003 100459869 : const double ff = fdata.force[i];
1004 441682247 : for(unsigned d=force_indices.threadsafe_derivatives_end[i];
1005 441682247 : d<force_indices.tot_indices[i]; ++d) {
1006 341222378 : forces[ force_indices.indices[i][d] - input.threadunsafe_forces_start ]
1007 341222378 : += ff*fdata.deriv[i][d];
1008 : }
1009 : }
1010 29091134 : }
1011 :
1012 : template <class T>
1013 250807 : void ParallelTaskManager<T>::gatherThreads( ForceOutput forces ) {
1014 : //Forceoutput is basically two spans, so it is ok to pass it by value
1015 : unsigned k=0;
1016 7612052 : for(unsigned n=forces.thread_unsafe.size()-forces.thread_safe.size(); n<forces.thread_unsafe.size(); ++n) {
1017 7361245 : forces.thread_unsafe[n] += forces.thread_safe[k];
1018 7361245 : ++k;
1019 : }
1020 250807 : }
1021 :
1022 : } // namespace PLMD
1023 : #endif
|