Skip to content

Commit

Permalink
Fix to check bytes accessed by MPIIO xfers and retry if possible
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmoore-hpe committed Dec 13, 2024
1 parent 51d8a79 commit b98f10f
Showing 1 changed file with 126 additions and 81 deletions.
207 changes: 126 additions & 81 deletions src/aiori-MPIIO.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ static IOR_offset_t MPIIO_Xfer(int access, aiori_fd_t * fdp, IOR_size_t * buffer
* MPI_Datatype, MPI_Status *);
*/
MPI_Status status;
MPI_Count elementsAccessed;
MPI_Count elementSize;
IOR_offset_t xferBytes, expectedBytes;
int retryAccess = 1, xferRetries = 0;

/* point functions to appropriate MPIIO calls */
if (access == WRITE) { /* WRITE */
Expand Down Expand Up @@ -443,100 +447,141 @@ static IOR_offset_t MPIIO_Xfer(int access, aiori_fd_t * fdp, IOR_size_t * buffer
* Access_ordered = MPI_File_read_ordered;
*/
}

/*
* 'useFileView' uses derived datatypes and individual file pointers
*/
if (param->useFileView) {
/* find offset in file */
if (SeekOffset(mfd->fd, offset, module_options) <
0) {
/* if unsuccessful */
length = -1;
} else {

/*
* 'useStridedDatatype' fits multi-strided pattern into a datatype;
* must use 'length' to determine repetitions (fix this for
* multi-segments someday, WEL):
* e.g., 'IOR -s 2 -b 32K -t 32K -a MPIIO --mpiio.useStridedDatatype --mpiio.useFileView'
*/
if (param->useStridedDatatype) {
if(offset >= (rank+1) * hints->blockSize){
/* we shall write only once per transferSize */
/* printf("FAKE access %d %lld\n", rank, offset); */
return hints->transferSize;
}
length = hints->segmentCount;
MPI_CHECK(MPI_File_set_view(mfd->fd, offset,
mfd->contigType,
mfd->fileType,
"native",
(MPI_Info) MPI_INFO_NULL), "cannot set file view");
/* printf("ACCESS %d %lld -> %lld\n", rank, offset, length); */
}else{
length = 1;
}
if (hints->collective) {
/* individual, collective call */
MPI_CHECK(Access_all
(mfd->fd, buffer, length,
mfd->transferType, &status),
"cannot access collective");
} else {
/* individual, noncollective call */
MPI_CHECK(Access
(mfd->fd, buffer, length,
mfd->transferType, &status),
"cannot access noncollective");
}
/* MPI-IO driver does "nontcontiguous" by transfering
* 'segment' regions of 'transfersize' bytes, but
* our caller WriteOrReadSingle does not know how to
* deal with us reporting that we wrote N times more
* data than requested. */
length = hints->transferSize;
}
} else {
while( retryAccess ) {
/*
* !useFileView does not use derived datatypes, but it uses either
* shared or explicit file pointers
* 'useFileView' uses derived datatypes and individual file pointers
*/
if (param->useSharedFilePointer) {
if (param->useFileView) {
/* find offset in file */
if (SeekOffset
(mfd->fd, offset, module_options) < 0) {
if (SeekOffset(mfd->fd, offset, module_options) <
0) {
/* if unsuccessful */
length = -1;
retryAccess = 0; // don't retry
} else {
/* shared, collective call */

/*
* this needs to be properly implemented:
*
* MPI_CHECK(Access_ordered(fd.MPIIO, buffer, length,
* MPI_BYTE, &status),
* "cannot access shared, collective");
*/
fprintf(stdout,
"useSharedFilePointer not implemented\n");
* 'useStridedDatatype' fits multi-strided pattern into a datatype;
* must use 'length' to determine repetitions (fix this for
* multi-segments someday, WEL):
* e.g., 'IOR -s 2 -b 32K -t 32K -a MPIIO --mpiio.useStridedDatatype --mpiio.useFileView'
*/
MPI_CHECK(MPI_Type_size_x(mfd->transferType, &elementSize), "couldn't get size of type");
if (param->useStridedDatatype) {
if(offset >= (rank+1) * hints->blockSize){
/* we shall write only once per transferSize */
/* printf("FAKE access %d %lld\n", rank, offset); */
return hints->transferSize;
}
length = hints->segmentCount;
MPI_CHECK(MPI_File_set_view(mfd->fd, offset,
mfd->contigType,
mfd->fileType,
"native",
(MPI_Info) MPI_INFO_NULL), "cannot set file view");
expectedBytes = hints->segmentCount * elementSize;
/* printf("ACCESS %d %lld -> %lld\n", rank, offset, length); */
/* get the size of transferType; length is overloaded and used
* set to number of segments */
}else{
length = 1;
expectedBytes = elementSize;
}
if (hints->collective) {
/* individual, collective call */
MPI_CHECK(Access_all
(mfd->fd, buffer, length,
mfd->transferType, &status),
"cannot access collective");
} else {
/* individual, noncollective call */
MPI_CHECK(Access
(mfd->fd, buffer, length,
mfd->transferType, &status),
"cannot access noncollective");
}
/* MPI-IO driver does "nontcontiguous" by transfering
* 'segment' regions of 'transfersize' bytes, but
* our caller WriteOrReadSingle does not know how to
* deal with us reporting that we wrote N times more
* data than requested. */
length = hints->transferSize;
MPI_CHECK(MPI_Get_elements_x(&status, MPI_INT, &elementsAccessed),
"can't get elements accessed" );
xferBytes = elementsAccessed * sizeof(MPI_BYTE);
}
} else {
if (hints->collective) {
/* explicit, collective call */
MPI_CHECK(Access_at_all
(mfd->fd, offset,
buffer, length, MPI_BYTE, &status),
"cannot access explicit, collective");
/*
* !useFileView does not use derived datatypes, but it uses either
* shared or explicit file pointers
*/
if (param->useSharedFilePointer) {
/* find offset in file */
if (SeekOffset
(mfd->fd, offset, module_options) < 0) {
/* if unsuccessful */
length = -1;
} else {
/* shared, collective call */
/*
* this needs to be properly implemented:
*
* MPI_CHECK(Access_ordered(fd.MPIIO, buffer, length,
* MPI_BYTE, &status),
* "cannot access shared, collective");
*/
fprintf(stdout,
"useSharedFilePointer not implemented\n");
}
} else {
/* explicit, noncollective call */
MPI_CHECK(Access_at
(mfd->fd, offset,
buffer, length, MPI_BYTE, &status),
"cannot access explicit, noncollective");
if (hints->collective) {
/* explicit, collective call */
MPI_CHECK(Access_at_all
(mfd->fd, offset,
buffer, length, MPI_BYTE, &status),
"cannot access explicit, collective");
} else {
/* explicit, noncollective call */
MPI_CHECK(Access_at
(mfd->fd, offset,
buffer, length, MPI_BYTE, &status),
"cannot access explicit, noncollective");
}
}
MPI_CHECK(MPI_Get_elements_x(&status, MPI_INT, &elementsAccessed),
"can't get elements accessed" );

expectedBytes = length;
xferBytes = elementsAccessed * sizeof(MPI_BYTE);
}

/* Retrying collective xfers would require syncing after every IO to check if any
* IOs were short and all ranks retrying. Instead, report the short access which
* will lead to an abort after returning. Otherwise, retry the IO similarly to
* what's done for POSIX except retrying the full request. expectedBytes is
* transferSize/length unless --mpiio-useStridedDatatype and it will then be
* segmentCount * transferSize */
if (xferBytes != expectedBytes ){
WARNF("task %d, partial %s, %lld of %lld bytes at offset %lld, xferRetries: %d\n",
rank,
access == WRITE ? "write()" : "read()",
xferBytes, expectedBytes,
offset, xferRetries);
/* don't allow retry if collective, return the short xfer amount now */
if (hints->collective) {
WARNF("task %d, not retrying collective %s\n", rank,
access == WRITE ? "write()" : "read()");
return xferBytes;
}
if (xferRetries++ > MAX_RETRY || hints->singleXferAttempt){
WARN("too many retries -- aborting");
return xferBytes;
}
} else {
retryAccess = 0; // expected data transferred, return normally
}
}
return hints->transferSize;
return hints->transferSize; // short xfers already returned in the expectedBytes/retry check
}

/*
Expand Down

0 comments on commit b98f10f

Please sign in to comment.