2 #ifndef DUNE_COMMUNICATOR
3 #define DUNE_COMMUNICATOR
150 static const void*
getAddress(
const V& v,
int index);
157 static int getSize(
const V&,
int index);
162 template<
class B,
class A>
class VariableBlockVector;
164 template<
class K,
class A,
int n>
167 typedef VariableBlockVector<FieldVector<K, n>, A>
Type;
192 static const IndexedType& gather(
const T& vec, std::size_t i);
194 static void scatter(T& vec,
const IndexedType& v, std::size_t i);
242 DatatypeCommunicator();
247 ~DatatypeCommunicator();
275 template<
class T1,
class T2,
class V>
276 void build(
const RemoteIndices& remoteIndices,
const T1& sourceFlags, V& sendData,
const T2& destFlags, V& receiveData);
305 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
311 MessageTypeMap messageTypes;
318 MPI_Request* requests_[2];
328 template<
class V,
bool FORWARD>
329 void createRequests(V& sendData, V& receiveData);
334 template<
class T1,
class T2,
class V,
bool send>
335 void createDataTypes(
const T1& source,
const T2& destination, V& data);
340 void sendRecv(MPI_Request* req);
345 struct IndexedTypeInformation
355 displ =
new MPI_Aint[i];
389 struct MPIDatatypeInformation
395 MPIDatatypeInformation(
const V& data): data_(data)
403 void reserve(
int proc,
int size)
405 information_[proc].build(size);
413 void add(
int proc,
int local)
415 IndexedTypeInformation& info=information_[proc];
416 assert(info.elements<info.size);
417 MPI_Address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
418 info.displ+info.elements);
419 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
427 std::map<int,IndexedTypeInformation> information_;
461 template<
class Data,
class Interface>
472 template<
class Data,
class Interface>
473 void build(
const Data& source,
const Data& target,
const Interface& interface);
503 template<
class GatherScatter,
class Data>
504 void forward(
const Data& source, Data& dest);
534 template<
class GatherScatter,
class Data>
535 void backward(Data& source,
const Data& dest);
562 template<
class GatherScatter,
class Data>
590 template<
class GatherScatter,
class Data>
608 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
615 template<
class Data,
typename IndexedTypeFlag>
616 struct MessageSizeCalculator
624 struct MessageSizeCalculator<Data,SizeOne>
649 struct MessageSizeCalculator<Data,VariableSize>
665 template<
class Data,
class GatherScatter,
bool send,
typename IndexedTypeFlag>
666 struct MessageGatherer
673 template<
class Data,
class GatherScatter,
bool send>
674 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
683 typedef GatherScatter Gatherer;
701 inline void operator()(
const InterfaceMap& interface,
const Data& data, Type* buffer,
size_t bufferSize)
const;
708 template<
class Data,
class GatherScatter,
bool send>
709 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
718 typedef GatherScatter Gatherer;
736 inline void operator()(
const InterfaceMap& interface,
const Data& data, Type* buffer,
size_t bufferSize)
const;
742 template<
class Data,
class GatherScatter,
bool send,
typename IndexedTypeFlag>
743 struct MessageScatterer
750 template<
class Data,
class GatherScatter,
bool send>
751 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
760 typedef GatherScatter Scatterer;
778 inline void operator()(
const InterfaceMap& interface, Data& data, Type* buffer,
const int& proc)
const;
784 template<
class Data,
class GatherScatter,
bool send>
785 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
794 typedef GatherScatter Scatterer;
812 inline void operator()(
const InterfaceMap& interface, Data& data, Type* buffer,
const int& proc)
const;
818 struct MessageInformation
822 : start_(0), size_(0)
832 MessageInformation(
size_t start,
size_t size)
833 :start_(start), size_(size)
851 typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
856 InformationMap messageInformation_;
864 size_t bufferSize_[2];
876 std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
878 MPI_Comm communicator_;
883 template<
class GatherScatter,
bool FORWARD,
class Data>
884 void sendRecv(
const Data& source, Data& target);
902 template<
class K,
class A,
int n>
903 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(
const Type& v,
int index)
905 return &(v[index][0]);
908 template<
class K,
class A,
int n>
909 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(
const Type& v,
int index)
911 return v[index].getsize();
915 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(
const T& vec, std::size_t i)
921 inline void CopyGatherScatter<T>::scatter(T& vec,
const IndexedType& v, std::size_t i)
927 DatatypeCommunicator<T>::DatatypeCommunicator()
928 : remoteIndices_(0), created_(false)
937 DatatypeCommunicator<T>::~DatatypeCommunicator()
943 template<
class T1,
class T2,
class V>
944 inline void DatatypeCommunicator<T>::build(
const RemoteIndices& remoteIndices,
945 const T1& source, V& sendData,
946 const T2& destination, V& receiveData)
948 remoteIndices_ = &remoteIndices;
950 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
951 createDataTypes<T1,T2,V,true>(source,destination, sendData);
952 createRequests<V,true>(sendData, receiveData);
953 createRequests<V,false>(receiveData, sendData);
958 void DatatypeCommunicator<T>::free()
961 delete[] requests_[0];
962 delete[] requests_[1];
963 typedef MessageTypeMap::iterator
iterator;
964 typedef MessageTypeMap::const_iterator const_iterator;
966 const const_iterator end=messageTypes.end();
968 for(iterator process = messageTypes.begin(); process != end; ++process){
969 MPI_Datatype *type = &(process->second.first);
972 MPI_Finalized(&finalized);
974 if(*type!=MPI_DATATYPE_NULL && !finalized)
976 type = &(process->second.second);
977 if(*type!=MPI_DATATYPE_NULL && !finalized)
980 messageTypes.clear();
987 template<
class T1,
class T2,
class V,
bool send>
988 void DatatypeCommunicator<T>::createDataTypes(
const T1& sourceFlags,
const T2& destFlags, V& data)
991 MPIDatatypeInformation<V> dataInfo(data);
992 this->
template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
994 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
995 const const_iterator end=this->remoteIndices_->end();
998 for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process){
999 IndexedTypeInformation& info=dataInfo.information_[process->first];
1002 MPI_Address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1004 for(
int i=0; i< info.elements; i++){
1005 info.displ[i]-=base;
1009 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1010 MPI_Type_hindexed(info.elements, info.length, info.displ,
1011 MPITraits<
typename CommPolicy<V>::IndexedType>::getType(),
1013 MPI_Type_commit(type);
1019 template<
typename T>
1020 template<
class V,
bool createForward>
1021 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1023 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1025 static int index = createForward?1:0;
1026 int noMessages = messageTypes.size();
1028 requests_[index] =
new MPI_Request[2*noMessages];
1029 const MapIterator end = messageTypes.end();
1031 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1034 for(MapIterator process = messageTypes.begin(); process != end;
1035 ++process, ++request){
1036 MPI_Datatype type = createForward ? process->second.second : process->second.first;
1038 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1043 for(MapIterator process = messageTypes.begin(); process != end;
1044 ++process, ++request){
1045 MPI_Datatype type = createForward ? process->second.first : process->second.second;
1047 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1051 template<
typename T>
1052 void DatatypeCommunicator<T>::forward()
1054 sendRecv(requests_[1]);
1057 template<
typename T>
1058 void DatatypeCommunicator<T>::backward()
1060 sendRecv(requests_[0]);
1063 template<
typename T>
1064 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1066 int noMessages = messageTypes.size();
1068 MPI_Startall(noMessages, requests);
1070 MPI_Startall(noMessages, requests+noMessages);
1073 MPI_Status* status=
new MPI_Status[2*noMessages];
1074 for(
int i=0; i<2*noMessages; i++)
1075 status[i].MPI_ERROR=MPI_SUCCESS;
1077 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1078 int receive = MPI_Waitall(noMessages, requests, status);
1081 int success=1, globalSuccess=0;
1082 if(send==MPI_ERR_IN_STATUS){
1084 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1085 std::cerr<<rank<<
": Error in sending :"<<std::endl;
1087 for(
int i=noMessages; i< 2*noMessages; i++)
1088 if(status[i].MPI_ERROR!=MPI_SUCCESS){
1091 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1092 std::cerr<<
" source="<<status[i].MPI_SOURCE<<
" message: ";
1093 for(
int i=0; i< messageLength; i++)
1094 std::cout<<message[i];
1096 std::cerr<<std::endl;
1100 if(receive==MPI_ERR_IN_STATUS){
1102 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1103 std::cerr<<rank<<
": Error in receiving!"<<std::endl;
1105 for(
int i=0; i< noMessages; i++)
1106 if(status[i].MPI_ERROR!=MPI_SUCCESS){
1109 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1110 std::cerr<<
" source="<<status[i].MPI_SOURCE<<
" message: ";
1111 for(
int i=0; i< messageLength; i++)
1112 std::cerr<<message[i];
1114 std::cerr<<std::endl;
1118 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1123 DUNE_THROW(CommunicationError,
"A communication error occurred!");
1135 template<
class Data,
class Interface>
1136 typename enable_if<is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value,
void>::type
1139 interfaces_=interface.interfaces();
1140 communicator_=interface.communicator();
1141 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1142 ::const_iterator const_iterator;
1143 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1144 const const_iterator end = interfaces_.end();
1146 MPI_Comm_rank(communicator_, &lrank);
1151 for(const_iterator interfacePair = interfaces_.begin();
1152 interfacePair != end; ++interfacePair){
1153 int noSend = MessageSizeCalculator<Data,Flag>()(interfacePair->second.first);
1154 int noRecv = MessageSizeCalculator<Data,Flag>()(interfacePair->second.second);
1155 messageInformation_.insert(std::make_pair(interfacePair->first,
1156 std::make_pair(MessageInformation(bufferSize_[0],
1157 noSend*
sizeof(
typename CommPolicy<Data>::IndexedType)),
1158 MessageInformation(bufferSize_[1],
1159 noRecv*
sizeof(
typename CommPolicy<Data>::IndexedType)))));
1160 bufferSize_[0] += noSend;
1161 bufferSize_[1] += noRecv;
1165 bufferSize_[0] *=
sizeof(
typename CommPolicy<Data>::IndexedType);
1166 bufferSize_[1] *=
sizeof(
typename CommPolicy<Data>::IndexedType);
1168 buffers_[0] =
new char[bufferSize_[0]];
1169 buffers_[1] =
new char[bufferSize_[1]];
1172 template<
class Data,
class Interface>
1176 interfaces_=interface.interfaces();
1177 communicator_=interface.communicator();
1178 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1179 ::const_iterator const_iterator;
1180 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1181 const const_iterator end = interfaces_.end();
1186 for(const_iterator interfacePair = interfaces_.begin();
1187 interfacePair != end; ++interfacePair){
1188 int noSend = MessageSizeCalculator<Data,Flag>()(source, interfacePair->second.first);
1189 int noRecv = MessageSizeCalculator<Data,Flag>()(dest, interfacePair->second.second);
1191 messageInformation_.insert(std::make_pair(interfacePair->first,
1192 std::make_pair(MessageInformation(bufferSize_[0],
1193 noSend*
sizeof(
typename CommPolicy<Data>::IndexedType)),
1194 MessageInformation(bufferSize_[1],
1195 noRecv*
sizeof(
typename CommPolicy<Data>::IndexedType)))));
1196 bufferSize_[0] += noSend;
1197 bufferSize_[1] += noRecv;
1200 bufferSize_[0] *=
sizeof(
typename CommPolicy<Data>::IndexedType);
1201 bufferSize_[1] *=
sizeof(
typename CommPolicy<Data>::IndexedType);
1203 buffers_[0] =
new char[bufferSize_[0]];
1204 buffers_[1] =
new char[bufferSize_[1]];
1209 messageInformation_.clear();
1211 delete[] buffers_[0];
1214 delete[] buffers_[1];
1215 buffers_[0]=buffers_[1]=0;
1223 template<
class Data>
1224 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1225 (
const InterfaceInformation& info)
const
1231 template<
class Data>
1232 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1233 (
const Data& data,
const InterfaceInformation& info)
const
1235 return operator()(info);
1239 template<
class Data>
1240 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1241 (
const Data& data,
const InterfaceInformation& info)
const
1245 for(
size_t i=0; i < info.size(); i++)
1246 entries += CommPolicy<Data>::getSize(data,info[i]);
1252 template<
class Data,
class GatherScatter,
bool FORWARD>
1253 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(
const InterfaceMap& interfaces,
const Data& data, Type* buffer,
size_t bufferSize)
const
1255 typedef typename InterfaceMap::const_iterator
1259 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1260 const const_iterator end = interfaces.end();
1263 for(const_iterator interfacePair = interfaces.begin();
1264 interfacePair != end; ++interfacePair){
1265 int size = forward ? interfacePair->second.first.size() :
1266 interfacePair->second.second.size();
1268 for(
int i=0; i < size; i++){
1269 int local = forward ? interfacePair->second.first[i] :
1270 interfacePair->second.second[i];
1271 for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local);j++, index++){
1273 #ifdef DUNE_ISTL_WITH_CHECKING
1274 assert(bufferSize>=(index+1)*
sizeof(
typename CommPolicy<Data>::IndexedType));
1276 buffer[index]=GatherScatter::gather(data, local, j);
1285 template<
class Data,
class GatherScatter,
bool FORWARD>
1286 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(
const InterfaceMap& interfaces,
const Data& data, Type* buffer,
size_t bufferSize)
const
1288 typedef typename InterfaceMap::const_iterator
1290 const const_iterator end = interfaces.end();
1294 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1296 for(const_iterator interfacePair = interfaces.begin();
1297 interfacePair != end; ++interfacePair){
1298 size_t size = FORWARD ? interfacePair->second.first.size() :
1299 interfacePair->second.second.size();
1301 for(
size_t i=0; i < size; i++){
1303 #ifdef DUNE_ISTL_WITH_CHECKING
1304 assert(bufferSize>=(index+1)*
sizeof(
typename CommPolicy<Data>::IndexedType));
1307 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1308 interfacePair->second.second[i]);
1315 template<
class Data,
class GatherScatter,
bool FORWARD>
1316 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(
const InterfaceMap& interfaces, Data& data, Type* buffer,
const int& proc)
const
1318 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1319 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1321 assert(infoPair!=interfaces.end());
1323 const Information& info = FORWARD ? infoPair->second.second :
1324 infoPair->second.first;
1326 for(
size_t i=0, index=0; i < info.size(); i++){
1327 for(
size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1328 GatherScatter::scatter(data, buffer[index++], info[i], j);
1333 template<
class Data,
class GatherScatter,
bool FORWARD>
1334 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(
const InterfaceMap& interfaces, Data& data, Type* buffer,
const int& proc)
const
1336 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1337 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1339 assert(infoPair!=interfaces.end());
1341 const Information& info = FORWARD ? infoPair->second.second :
1342 infoPair->second.first;
1344 for(
size_t i=0; i < info.size(); i++){
1345 GatherScatter::scatter(data, buffer[i], info[i]);
1350 template<
class GatherScatter,
class Data>
1353 this->
template sendRecv<GatherScatter,true>(data, data);
1357 template<
class GatherScatter,
class Data>
1360 this->
template sendRecv<GatherScatter,false>(data, data);
1364 template<
class GatherScatter,
class Data>
1367 this->
template sendRecv<GatherScatter,true>(source, dest);
1371 template<
class GatherScatter,
class Data>
1374 this->
template sendRecv<GatherScatter,false>(dest, source);
1378 template<
class GatherScatter,
bool FORWARD,
class Data>
1379 void BufferedCommunicator::sendRecv(
const Data& source, Data& dest)
1383 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1384 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1386 typedef typename CommPolicy<Data>::IndexedType Type;
1387 Type *sendBuffer, *recvBuffer;
1388 size_t sendBufferSize;
1390 size_t recvBufferSize;
1394 sendBuffer =
reinterpret_cast<Type*
>(buffers_[0]);
1395 sendBufferSize = bufferSize_[0];
1396 recvBuffer =
reinterpret_cast<Type*
>(buffers_[1]);
1398 recvBufferSize = bufferSize_[1];
1401 sendBuffer =
reinterpret_cast<Type*
>(buffers_[1]);
1402 sendBufferSize = bufferSize_[1];
1403 recvBuffer =
reinterpret_cast<Type*
>(buffers_[0]);
1405 recvBufferSize = bufferSize_[0];
1408 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1410 MessageGatherer<Data,GatherScatter,FORWARD,Flag>()(interfaces_, source, sendBuffer, sendBufferSize);
1412 MPI_Request* sendRequests =
new MPI_Request[messageInformation_.size()];
1413 MPI_Request* recvRequests =
new MPI_Request[messageInformation_.size()];
1416 typedef typename InformationMap::const_iterator const_iterator;
1418 const const_iterator end = messageInformation_.end();
1420 int* processMap =
new int[messageInformation_.size()];
1422 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i){
1423 processMap[i]=info->first;
1425 assert(info->second.second.start_*
sizeof(
typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1426 Dune::dvverb<<rank<<
": receiving "<<info->second.second.size_<<
" from "<<info->first<<std::endl;
1427 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1428 MPI_BYTE, info->first, commTag_, communicator_,
1431 assert(info->second.first.start_*
sizeof(
typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1432 Dune::dvverb<<rank<<
": receiving "<<info->second.first.size_<<
" to "<<info->first<<std::endl;
1433 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1434 MPI_BYTE, info->first, commTag_, communicator_,
1441 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1443 assert(info->second.second.start_*
sizeof(
typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1444 Dune::dvverb<<rank<<
": sending "<<info->second.first.size_<<
" to "<<info->first<<std::endl;
1445 assert(info->second.first.start_*
sizeof(
typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1446 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1447 MPI_BYTE, info->first, commTag_, communicator_,
1450 assert(info->second.second.start_*
sizeof(
typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1451 Dune::dvverb<<rank<<
": sending "<<info->second.second.size_<<
" to "<<info->first<<std::endl;
1452 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1453 MPI_BYTE, info->first, commTag_, communicator_,
1460 int finished = MPI_UNDEFINED;
1464 for(i=0;i< messageInformation_.size();i++){
1465 status.MPI_ERROR=MPI_SUCCESS;
1466 MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1467 assert(finished != MPI_UNDEFINED);
1469 if(status.MPI_ERROR==MPI_SUCCESS){
1470 int& proc = processMap[finished];
1471 typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1472 assert(infoIter != messageInformation_.end());
1474 MessageInformation info = (FORWARD)? infoIter->second.second : infoIter->second.first;
1475 assert(info.start_+info.size_ <= recvBufferSize);
1477 MessageScatterer<Data,GatherScatter,FORWARD,Flag>()(interfaces_, dest, recvBuffer+info.start_, proc);
1479 std::cerr<<rank<<
": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1484 MPI_Status recvStatus;
1487 for(i=0;i< messageInformation_.size();i++)
1488 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)){
1489 std::cerr<<rank<<
": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1499 delete[] processMap;
1500 delete[] sendRequests;
1501 delete[] recvRequests;