Skip to content

Commit

Permalink
Added dog prints into RIStream to trace unexpected eof when unzipping…
Browse files Browse the repository at this point in the history
… files from s3 via hdfs.
  • Loading branch information
tomasnykodym committed May 8, 2013
1 parent ec506a9 commit 7190b31
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 21 deletions.
23 changes: 14 additions & 9 deletions src/main/java/water/hdfs/PersistHdfs.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,27 @@ public H2OHdfsInputStream(Path p, long offset,ProgressMonitor pmon) throws IOExc
super(offset,pmon);
_path = p;
_fs = FileSystem.get(p.toUri(), CONF);
setExpectedSz(_fs.getFileStatus(p).getLen());
open();
}

@Override protected InputStream open(long offset) {
try{
FSDataInputStream is = _fs.open(_path);
is.seek(offset);
return is;
} catch(IOException e){
throw new RuntimeException("Opening Hdfs path " + _path.toString() + " failed!");
}
@Override protected InputStream open(long offset) throws IOException {
FSDataInputStream is = _fs.open(_path);
is.seek(offset);
return is;
}

}
public static InputStream openStream(Key k,ProgressMonitor pmon) throws IOException{
return new H2OHdfsInputStream(getPathForKey(k),0,pmon);
H2OHdfsInputStream res = null;
try{
res = new H2OHdfsInputStream(getPathForKey(k),0,pmon);
} catch(IOException e){
try{Thread.sleep(1000);} catch(Exception ex){}
Log.warn("Error while opening HDFS key " + k.toString() + ", will wait and retry.");
res = new H2OHdfsInputStream(getPathForKey(k),0,pmon);
}
return res;
}

public static void addFolder(Path p, JsonArray succeeded, JsonArray failed) throws IOException {
Expand Down
52 changes: 40 additions & 12 deletions src/main/java/water/util/RIStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.io.IOException;
import java.io.InputStream;

import javax.management.RuntimeErrorException;

import water.Job.ProgressMonitor;
import water.Key;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
Expand All @@ -10,31 +13,47 @@
public abstract class RIStream extends InputStream {
InputStream _is;
ProgressMonitor _pmon;
public final int _retries = 3;
public final int _retries = 5;
String [] _bk;
private long _off;

boolean _knownSize;
long _expectedSz;
protected RIStream( long off, ProgressMonitor pmon){
_off = off;
}

public void setExpectedSz(long sz){
_knownSize = true;
_expectedSz = sz;
}
public final void open(){
assert _is == null;
_is = open(_off);
try{
_is = open(_off);
} catch(IOException e){
throw new RuntimeException(e);
}
}

protected abstract InputStream open(long offset);
protected abstract InputStream open(long offset) throws IOException;

public void closeQuietly(){
try{close();} catch(Exception e){} // ignore any errors
}
private void try2Recover(int attempt, IOException e) {
System.out.println("[H2OS3InputStream] Attempt("+attempt + ") to recover from " + e.getMessage() + "), off = " + _off);
e.printStackTrace();
if(attempt == _retries) Throwables.propagate(e);
Log.warn("[H2OS3InputStream] Attempt("+attempt + ") to recover from " + e.getMessage() + "), off = " + _off);
try{_is.close();}catch(IOException ex){}
_is = null;
if(attempt > 0) try {Thread.sleep(256 << attempt);}catch(InterruptedException ex){}
open(_off);
open();
return;
}
private void updateOffset(int off) {
if(_knownSize)assert off + _off <= _expectedSz;
_off += off;
}

@Override
public boolean markSupported(){
return false;
Expand All @@ -49,6 +68,9 @@ public final int available() throws IOException {
int attempts = 0;
while(true){
try {
int res = _is.available();
if(res == 0 && _knownSize && _off < _expectedSz)
Log.warn("premature end of file reported, expected " + _expectedSz + " bytes, but got eof after " + _off + " bytes");
return _is.available();
} catch (IOException e) {
try2Recover(attempts++,e);
Expand All @@ -62,8 +84,10 @@ public int read() throws IOException {
while(true){
try{
int res = _is.read();
if(res == -1 && _knownSize && _off < _expectedSz)
Log.warn("premature end of file reported, expected " + _expectedSz + " bytes, but got eof after " + _off + " bytes");
if(res != -1){
_off += 1;
updateOffset(1);
if(_pmon != null)_pmon.update(1);
}
return res;
Expand All @@ -79,8 +103,10 @@ public int read(byte [] b) throws IOException {
while(true){
try {
int res = _is.read(b);
if(res == 0 && _knownSize && _off < _expectedSz)
Log.warn("premature end of file reported, expected " + _expectedSz + " bytes, but got eof after " + _off + " bytes");
if(res > 0){
_off += res;
updateOffset(res);
if(_pmon != null)_pmon.update(res);
}
return res;
Expand All @@ -95,9 +121,11 @@ public int read(byte [] b, int off, int len) throws IOException {
int attempts = 0;
while(true){
try {
int res = _is.read(b,off,len);;
int res = _is.read(b,off,len);
if(res == 0 && _knownSize && _off < _expectedSz)
Log.warn("premature end of file reported, expected " + _expectedSz + " bytes, but got eof after " + _off + " bytes");
if(res > 0){
_off += res;
updateOffset(res);
if(_pmon != null)_pmon.update(res);
}
return res;
Expand All @@ -122,7 +150,7 @@ public long skip(long n) throws IOException {
try{
long res = _is.skip(n);
if(res > 0){
_off += res;
updateOffset((int)res);
if(_pmon != null)_pmon.update(res);
}
return res;
Expand Down

0 comments on commit 7190b31

Please sign in to comment.