Skip to content

Commit

Permalink
Cleaning up the multi-threading mess. Fixed the race condition where …
Browse files Browse the repository at this point in the history
…crawler would over or under crawl.
  • Loading branch information
brianmadden committed Feb 1, 2017
1 parent d0d5baa commit a7fca99
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 30 deletions.
12 changes: 3 additions & 9 deletions example/src/main/kotlin/SimpleExample.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import io.thelandscape.krawler.http.KrawlDocument
import io.thelandscape.krawler.http.KrawlUrl
import java.time.LocalTime
import java.util.concurrent.ConcurrentSkipListSet
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.write

class SimpleExample(config: KrawlConfig = KrawlConfig()) : Krawler(config) {

Expand All @@ -41,18 +38,15 @@ class SimpleExample(config: KrawlConfig = KrawlConfig()) : Krawler(config) {
return (!FILTERS.matches(withoutGetParams) && url.host in whitelist)
}


private val counterLock: ReentrantReadWriteLock = ReentrantReadWriteLock()
private var counter: Int = 0
get() = counterLock.read { field }
set(value) = counterLock.write { field = value}
private val counterLock: Any = Any()

override fun visit(url: KrawlUrl, doc: KrawlDocument) {
println("${++counter}. Crawling ${url.canonicalForm}")
println("${synchronized(counterLock) {++counter}}. Crawling ${url.canonicalForm}")
}

override fun onContentFetchError(url: KrawlUrl, reason: String) {
println("${++counter}. Tried to crawl ${url.canonicalForm} but failed to read the content.")
println("${synchronized(counterLock) {++counter}}. Tried to crawl ${url.canonicalForm} but failed to read the content.")
}

private var startTimestamp: Long = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,22 @@ import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import io.thelandscape.krawler.crawler.KrawlConfig
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

class ScheduledQueue(private val queues: List<KrawlQueueIf>, private val config: KrawlConfig) {

private val popLock: ReentrantLock = ReentrantLock()
private val popLock: Any = Any()
private var popSelector: Int = 0
get() = popLock.withLock { field }
set(value) = popLock.withLock { field = value }

private val pushLock: ReentrantLock = ReentrantLock()
private val pushLock: Any = Any()
private var pushSelector: Int = 0
get() = pushLock.withLock { field }
set(value) = pushLock.withLock { field = value }

private val pushAffinityCache: LoadingCache<String, Int> = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterAccess(10, TimeUnit.MINUTES)
.build(
object : CacheLoader<String, Int>() {
override fun load(key: String): Int {
return pushSelector++ % queues.size
return synchronized(pushLock) { pushSelector++ % queues.size }
}
}
)
Expand Down Expand Up @@ -80,7 +73,7 @@ class ScheduledQueue(private val queues: List<KrawlQueueIf>, private val config:
pushSelector

// Pop a URL off the queue
var entry: KrawlQueueEntry? = queues[popSelector++ % modVal].pop()
var entry: KrawlQueueEntry? = synchronized(popLock) { queues[popSelector++ % modVal].pop() }

// Multiply by queue size, we'll check all of the queues each second
while (entry == null && emptyQueueWaitCount < (config.emptyQueueWaitTime * modVal)) {
Expand All @@ -89,7 +82,7 @@ class ScheduledQueue(private val queues: List<KrawlQueueIf>, private val config:
emptyQueueWaitCount++

// Try to pop again
entry = queues[popSelector++ % modVal].pop()
entry = synchronized(popLock) { queues[popSelector++ % modVal].pop() }
}

return entry
Expand Down
18 changes: 9 additions & 9 deletions src/main/kotlin/io/thelandscape/krawler/crawler/Krawler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import io.thelandscape.krawler.robots.RobotsConfig
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

/**
* Class defines the operations and data structures used to perform a web crawl.
Expand Down Expand Up @@ -244,12 +242,11 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
* Private members
*/
// Lock for the synchronized block to determine when to stop
val syncLock: Any = Any()
private val syncLock: Any = Any()

private val visitLock: ReentrantLock = ReentrantLock()
var visitCount: Int = 0
get() = visitLock.withLock { field }
private set(value) = visitLock.withLock { field = value }
/** This should be utilized within a locked or synchronized block **/
@Volatile var visitCount: Int = 0
private set

// Set of redirect codes
private val redirectCodes: Set<Int> = setOf(300, 301, 302, 303, 307, 308)
Expand Down Expand Up @@ -284,8 +281,8 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),

// Check if we should continue crawling
synchronized(syncLock) {
// This will also set continueCrawling to false if the totalPages has been hit
if (++visitCount > config.totalPages) return
if (visitCount == config.totalPages + 1) return
visitCount++
}

val doc: RequestResponse = requestProvider.getUrl(krawlUrl)
Expand Down Expand Up @@ -335,6 +332,9 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
val locStr: String = doc.headers["location"] ?: return listOf()
val location: KrawlUrl = KrawlUrl.new(locStr, url)

// We won't count it as a visit sinc
synchronized(syncLock) { visitCount-- }

return listOf(KrawlQueueEntry(location.canonicalForm, history, depth))
}

Expand Down

0 comments on commit a7fca99

Please sign in to comment.