Skip to content

Commit

Permalink
Share startup now more responsive and better explained, and disappear…
Browse files Browse the repository at this point in the history
…ed files under shares better dealt-with
  • Loading branch information
macavity23 committed Jul 19, 2011
1 parent 2ddd2a3 commit 90c2e0b
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 90 deletions.
2 changes: 1 addition & 1 deletion console/src/java/com/robonobo/console/cmds/share.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void run(RobonoboConsole console, String[] args, final PrintWriter out) t
return;
}
if (args[0].equalsIgnoreCase("all")) {
Set<String> shareStreamIds = controller.getShares();
Set<String> shareStreamIds = controller.getShareStreamIds();
List<SharedTrack> shares = new ArrayList<SharedTrack>();
for (String streamId : shareStreamIds) {
Track t = controller.getTrack(streamId);
Expand Down
17 changes: 11 additions & 6 deletions core/src/java/com/robonobo/core/RobonoboController.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ public Map<String, List<File>> getITunesPlaylists(FileFilter filter) throws Robo
}
}

/** Notifies that this streamId is going to be played shortly and should be downloaded/prioritized as necessary
/** Notifies that this streamId is going to be played shortly and should be downloaded/prioritized as necessary
*
* @throws RobonoboException */
public void preFetch(String streamId) throws RobonoboException {
if(inst.getShareService().getShare(streamId) != null)
if (inst.getShareService().getShare(streamId) != null)
return;
inst.getDownloadService().preFetch(streamId);
}
Expand Down Expand Up @@ -236,8 +237,12 @@ public String getMimeTypeForFile(File f) {
return inst.getFormatService().getMimeTypeForFile(f);
}

public Set<String> getShares() {
return inst.getDbService().getShares();
public int getNumShares() {
return inst.getDbService().getNumShares();
}

public Set<String> getShareStreamIds() {
return inst.getDbService().getShareSids();
}

public List<SharedTrack> getSharesByPattern(String searchPattern) {
Expand Down Expand Up @@ -444,7 +449,7 @@ public void createPlaylist(Playlist p, PlaylistCallback handler) {
public void addFriends(Collection<String> emails) {
inst.getUserService().addFriends(emails);
}

public void sharePlaylist(Playlist p, Set<Long> friendIds, Set<String> emails) throws RobonoboException {
try {
inst.getPlaylistService().sharePlaylist(p, friendIds, emails);
Expand Down Expand Up @@ -508,7 +513,7 @@ public void requestTopUp() throws IOException {
public Library getFriendLibrary(long userId) {
return inst.getDbService().getLibrary(userId);
}

public UpdateInfo getUpdateInfo() throws RobonoboException {
try {
return inst.getHttpService().getUpdateInfo();
Expand Down
22 changes: 21 additions & 1 deletion core/src/java/com/robonobo/core/service/DbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class DbService extends AbstractService {
private static final String DELETE_PLAYLIST_SEEN_SIDS = "DELETE FROM playlist_seen_sids WHERE playlist_id = ?";
private static final String DELETE_LIBRARY_UNKNOWN_TRACK = "DELETE FROM library_unknown_tracks WHERE user_id = ? AND stream_id = ?";
private static final String GET_ALL_SHARE_STREAM_IDS = "SELECT STREAM_ID FROM SHARES";
private static final String GET_COUNT_SHARES = "SELECT COUNT(*) FROM SHARES";
private static final String GET_ALL_DOWNLOAD_STREAM_IDS = "SELECT STREAM_ID FROM DOWNLOADS ORDER BY DATE_STARTED ASC";
private static final String GET_NUM_RUNNING_DOWNLOADS = "SELECT COUNT(*) FROM DOWNLOADS WHERE STATUS = 'Downloading'";
private static final String GET_ALL_WATCHDIRS = "SELECT * FROM WATCHDIRS";
Expand Down Expand Up @@ -196,7 +197,7 @@ public List<File> getWatchDirs() {
}
}

public Set<String> getShares() {
public Set<String> getShareSids() {
if (!running)
return null;
Connection conn = null;
Expand All @@ -218,6 +219,25 @@ public Set<String> getShares() {
}
}

public int getNumShares() {
if (!running)
return 0;
Connection conn = null;
try {
conn = getConnection();
PreparedStatement ps = conn.prepareStatement(GET_COUNT_SHARES);
ResultSet rs = ps.executeQuery();
rs.next();
return rs.getInt(1);
} catch (SQLException e) {
log.error("Error retrieving shares from db", e);
return 0;
} finally {
if (conn != null)
returnConnection(conn);
}
}

public Collection<SharedTrack> getSharesByPattern(String searchPattern) {
if (!running)
return null;
Expand Down
2 changes: 2 additions & 0 deletions core/src/java/com/robonobo/core/service/DownloadService.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public void shutdown() throws Exception {
}

public void addDownload(String streamId) throws RobonoboException {
// If we have a defunct share, replace it with this download
rbnb.getShareService().nukeDefunctShare(streamId);
File dataFile = new File(downloadsDir, makeFileNameSafe(streamId));
log.info("Adding download for " + streamId);
Stream s = streams.getKnownStream(streamId);
Expand Down
132 changes: 89 additions & 43 deletions core/src/java/com/robonobo/core/service/ShareService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,32 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import javax.swing.text.StyledEditorKit.ForegroundAction;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.robonobo.common.concurrent.Batcher;
import com.robonobo.common.concurrent.CatchingRunnable;
import com.robonobo.common.exceptions.Errot;
import com.robonobo.common.pageio.buffer.FilePageBuffer;
import com.robonobo.common.util.FileUtil;
import com.robonobo.core.api.RobonoboException;
import com.robonobo.core.api.Task;
import com.robonobo.core.api.model.*;
import com.robonobo.core.api.model.DownloadingTrack.DownloadStatus;
import com.robonobo.core.api.model.SharedTrack.ShareStatus;
import com.robonobo.core.storage.StorageService;
import com.robonobo.mina.external.MinaControl;
import com.robonobo.mina.external.buffer.PageBuffer;
import com.robonobo.spi.FormatSupportProvider;
import com.sun.org.apache.bcel.internal.generic.StoreInstruction;

/** Responsible for translating mina Broadcasts to robonobo Shares
*
* @author macavity */
public class ShareService extends AbstractService {
private static final long START_SHARE_BATCH_TIME = 1000L;
/** Seconds */
// public static final int WATCHDIR_CHECK_FREQ = 60 * 10;
public static final int WATCHDIR_CHECK_FREQ = 30;
Expand All @@ -41,7 +47,9 @@ public class ShareService extends AbstractService {
StreamService streams;
PlaybackService playback;
MinaControl mina;
private Set<String> shareStreamIds;
/** These are shares that have been added but whose files are no longer present - we keep track of these in case they
* are on removable drives that might be plugged back in later, but we don't show them to the user */
Set<String> defunctShareSids = new HashSet<String>();
private ScheduledFuture<?> watchDirTask;
private boolean watchDirRunning = false;

Expand All @@ -60,8 +68,6 @@ public void startup() throws Exception {
streams = rbnb.getStreamService();
playback = rbnb.getPlaybackService();
mina = rbnb.getMina();
// Keep track of our stream ids, everything else loaded on-demand from the db
shareStreamIds = db.getShares();
watchDirTask = getRobonobo().getExecutor().scheduleWithFixedDelay(new WatchDirChecker(), WATCHDIR_INITIAL_WAIT, WATCHDIR_CHECK_FREQ, TimeUnit.SECONDS);
}

Expand All @@ -71,6 +77,20 @@ public void shutdown() {
watchDirTask.cancel(true);
}

/** If there is a defunct share (ie one with a non-existent file behind it), nuke it */
public void nukeDefunctShare(String sid) {
boolean isDefunct;
synchronized (this) {
isDefunct = defunctShareSids.contains(sid);
if (isDefunct)
defunctShareSids.remove(sid);
}
if (isDefunct) {
storage.nukePageBuf(sid);
db.deleteShare(sid);
}
}

/** NB The stream referenced by stream id must already have been put into streamservice
*
* @param streamId
Expand All @@ -79,6 +99,8 @@ public void shutdown() {
public void addShare(Stream s, File dataFile) throws RobonoboException {
String streamId = s.getStreamId();
log.info("Adding share for id " + streamId + " at " + dataFile.getAbsolutePath());
// If we have a defunct share for this (ie one with a nonexistent file behind it), replace it with this one
nukeDefunctShare(streamId);
SharedTrack sh = db.getShare(streamId);
if (sh != null) {
log.info("Not adding share for id " + streamId + " - sharing already");
Expand All @@ -97,9 +119,6 @@ public void addShare(Stream s, File dataFile) throws RobonoboException {
}
sh.setDateAdded(now());
db.putShare(sh);
synchronized (this) {
shareStreamIds.add(s.getStreamId());
}
startShare(streamId);
rbnb.getLibraryService().addToMyLibrary(streamId);
event.fireTrackUpdated(s.getStreamId());
Expand Down Expand Up @@ -140,9 +159,6 @@ public void addShareFromCompletedDownload(DownloadingTrack d) throws RobonoboExc
SharedTrack sh = new SharedTrack(s, shareFile, ShareStatus.Sharing);
sh.setDateAdded(now());
db.putShare(sh);
synchronized (this) {
shareStreamIds.add(s.getStreamId());
}
try {
rbnb.getStorageService().createPageBufForShare(s, shareFile, false);
} catch (IOException e) {
Expand All @@ -162,9 +178,6 @@ public void deleteShare(String streamId) {
stopShare(streamId);
db.deleteShare(streamId);
storage.nukePageBuf(streamId);
synchronized (this) {
shareStreamIds.remove(streamId);
}
rbnb.getLibraryService().delFromMyLibrary(streamId);
event.fireTrackUpdated(streamId);
event.fireMyLibraryUpdated();
Expand All @@ -188,10 +201,11 @@ public String getProvides() {

public SharedTrack getShare(String streamId) {
synchronized (this) {
if (!shareStreamIds.contains(streamId))
if (defunctShareSids.contains(streamId))
return null;
}
return db.getShare(streamId);
SharedTrack share = db.getShare(streamId);
return share;
}

public Collection<SharedTrack> getSharesByPattern(String searchPattern) {
Expand Down Expand Up @@ -240,39 +254,71 @@ public void checkWatchDir(File watchDir) throws RobonoboException {
}

void startAllShares() throws IOException, RobonoboException {
log.debug("Start Share thread running");
// Copy out our stream ids so we can iterate while adding new shares
String[] arr;
synchronized (this) {
arr = new String[shareStreamIds.size()];
shareStreamIds.toArray(arr);
if(db.getNumShares() > 0)
rbnb.getTaskService().runTask(new StartSharesTask());
else {
rbnb.getTrackService().setAllSharesStarted(true);
event.fireAllTracksLoaded();
event.fireMyLibraryUpdated();
}
Set<String> shareSids = new HashSet<String>();
for (String sid : arr) {
// We don't cache the page buffer unless we need it (there could be 10^4+), just look it up to make sure
// it's kosher
FilePageBuffer pb = storage.getPageBuf(sid, false);
if (pb == null) {
// Errot
log.error("Found null pagebuf when starting share for " + sid + " - deleting share");
db.deleteShare(sid);
continue;
}
// If the file for this share doesn't exist, don't start this share but keep the reference around - might be
// a removable drive that will come back later
File file = pb.getFile();
if (!file.exists() || !file.canRead()) {
log.error("Could not find or read from file " + file.getAbsolutePath() + " for stream " + sid + " - not starting share");
synchronized (this) {
shareStreamIds.remove(sid);
}

class StartSharesTask extends Task {
boolean finished = false;
public StartSharesTask() {
title = "Starting my shares";
}

@Override
public void runTask() throws Exception {
Set<String> shareSids = db.getShareSids();
final int numToStart = shareSids.size();
// Iterate over these and check that they're ok, punting them up to the gui in batches
Batcher<String> puntToGuiBatcher = new Batcher<String>(START_SHARE_BATCH_TIME, rbnb.getExecutor()) {
int i=1;
protected void runBatch(Collection<String> sids) throws Exception {
rbnb.getMina().startBroadcasts(sids);
event.fireTracksUpdated(sids);
i += sids.size();
if(!finished) {
completion = ((float)i)/numToStart;
statusText = "Starting share "+i+" of "+numToStart;
fireUpdated();
}
}
};
statusText = "Starting share 1 of "+numToStart;
fireUpdated();
for (String sid : shareSids) {
FilePageBuffer pb = storage.getPageBuf(sid, false);
if (pb == null) {
// Errot
log.error("Found null pagebuf when starting share for " + sid + " - deleting share");
db.deleteShare(sid);
continue;
}
// If the file for this share doesn't exist, don't start this share but keep the reference around -
// might be
// a removable drive that will come back later
File file = pb.getFile();
if (!file.exists() || !file.canRead()) {
log.error("Could not find or read from file " + file.getAbsolutePath() + " for stream " + sid + " - not starting share");
synchronized (this) {
defunctShareSids.add(sid);
}
continue;
}
continue;
puntToGuiBatcher.add(sid);
}
shareSids.add(sid);
log.debug("Start Share thread finished: started " + shareSids.size() + " shares");
finished = true;
completion = 1f;
statusText = "Done.";
fireUpdated();
rbnb.getTrackService().setAllSharesStarted(true);
event.fireAllTracksLoaded();
event.fireMyLibraryUpdated();
}
if(shareSids.size() > 0)
rbnb.getMina().startBroadcasts(shareSids);
log.debug("Start Share thread finished: started " + shareSids.size() + " shares");
}

private class WatchDirChecker extends CatchingRunnable {
Expand Down
19 changes: 7 additions & 12 deletions core/src/java/com/robonobo/core/service/TrackService.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,9 @@ public void startup() throws Exception {
playback = rbnb.getPlaybackService();
event = rbnb.getEventService();
mina = rbnb.getMina();
// Fork off a thread to start shares, we might have a lot... do this here rather than in shareservice as we
// start after them and we need to be present
// to fire allSharesStarted
log.debug("Spawning thread to start shares");
getRobonobo().getExecutor().execute(new CatchingRunnable() {
public void doRun() throws Exception {
share.startAllShares();
allSharesStarted = true;
event.fireAllTracksLoaded();
event.fireMyLibraryUpdated();
}
});
// Do this here rather than in shareservice as we
// start after them and we need to be present to fire allTracksLoaded
share.startAllShares();
event.addTransferSpeedListener(this);
started = true;
}
Expand Down Expand Up @@ -162,4 +153,8 @@ public void notifyPlayingTrackChange(String newPlayingStreamId) {
public boolean haveAllSharesStarted() {
return allSharesStarted;
}

public void setAllSharesStarted(boolean allSharesStarted) {
this.allSharesStarted = allSharesStarted;
}
}
Binary file added gui/src/img/icon/icon-16-palm-1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added gui/src/img/icon/icon-16-radio-1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions gui/src/java/com/robonobo/gui/frames/RobonoboFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void openRbnbUri(String uri) {

public void showWelcome(boolean forceShow) {
// If we have no shares (or we're forcing it), show the welcome dialog
final boolean gotShares = (control.getShares().size() > 0);
final boolean gotShares = (control.getNumShares() > 0);
if (forceShow || (!gotShares && guiConfig.getShowWelcomePanel())) {
SwingUtilities.invokeLater(new CatchingRunnable() {
public void doRun() throws Exception {
Expand Down Expand Up @@ -517,7 +517,7 @@ public void confirmThenShutdown() {
invokeLater(new CatchingRunnable() {
public void doRun() throws Exception {
// If we aren't sharing anything, just close
if (getController().getShares().size() == 0) {
if (getController().getNumShares() == 0) {
shutdown();
return;
}
Expand Down
Loading

0 comments on commit 90c2e0b

Please sign in to comment.