Skip to content

Commit

Permalink
Rework doCrawl to be a pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmadden committed May 12, 2017
1 parent 5ce5aef commit d1dcf8f
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 82 deletions.
2 changes: 1 addition & 1 deletion example/src/main/kotlin/main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.thelandscape.krawler.crawler.KrawlConfig

fun main(args: Array<String>) {

val config: KrawlConfig = KrawlConfig(totalPages = 1000)
val config: KrawlConfig = KrawlConfig(totalPages = 100)
val k = SimpleExample(config)

// Add a few different hosts to the whitelist
Expand Down
21 changes: 7 additions & 14 deletions src/main/kotlin/io/thelandscape/krawler/crawler/KrawlQueue/Dao.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import com.github.andrewoma.kwery.mapper.*
import com.github.andrewoma.kwery.mapper.util.camelToLowerUnderscore
import io.thelandscape.krawler.crawler.History.KrawlHistoryEntry
import io.thelandscape.krawler.crawler.History.KrawlHistoryHSQLDao
import kotlinx.coroutines.experimental.sync.Mutex
import java.util.concurrent.TimeUnit

object historyConverter :
SimpleConverter<KrawlHistoryEntry>( { row, c -> KrawlHistoryEntry(row.long(c)) }, KrawlHistoryEntry::id)
Expand Down Expand Up @@ -62,8 +60,7 @@ class KrawlQueueHSQLDao(name: String,
"(url VARCHAR(2048) NOT NULL, parent INT, depth INT, timestamp TIMESTAMP)")
}

private val syncMutex = Mutex()
override suspend fun pop(): KrawlQueueEntry? {
override fun pop(): KrawlQueueEntry? {
val historyEntry = Type(KrawlHistoryEntry::id, { histDao.findByIds(it) })
val queueEntry = Type(
KrawlQueueEntry::url,
Expand All @@ -76,20 +73,16 @@ class KrawlQueueHSQLDao(name: String,

val selectSql = "SELECT TOP 1 $columns FROM ${table.name}"
var out: List<KrawlQueueEntry> = listOf()
// Synchronize this to prevent race conditions between popping and deleting
try {
syncMutex.lock()
out = session.select(selectSql, mapper = table.rowMapper())
if (out.isNotEmpty())
session.update("DELETE FROM ${table.name} WHERE url = :id", mapOf("id" to out.first().url))
} finally {
syncMutex.unlock()
}

// No need to synchronize here, current implementation only uses a single thread to pop URLs
out = session.select(selectSql, mapper = table.rowMapper())
if (out.isNotEmpty())
session.update("DELETE FROM ${table.name} WHERE url = :id", mapOf("id" to out.first().url))

return out.fetch(Node.all).firstOrNull()
}

override suspend fun push(urls: List<KrawlQueueEntry>): List<KrawlQueueEntry> {
override fun push(urls: List<KrawlQueueEntry>): List<KrawlQueueEntry> {
if (urls.isNotEmpty())
return this.batchInsert(urls)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ package io.thelandscape.krawler.crawler.KrawlQueue
* Interface representing a KrawlQueue
*/
interface KrawlQueueIf {
suspend fun pop (): KrawlQueueEntry?
suspend fun push (urls: List<KrawlQueueEntry>): List<KrawlQueueEntry>
fun pop (): KrawlQueueEntry?
fun push (urls: List<KrawlQueueEntry>): List<KrawlQueueEntry>
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,24 @@ import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import io.thelandscape.krawler.crawler.KrawlConfig
import kotlinx.coroutines.experimental.delay
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class ScheduledQueue(private val queues: List<KrawlQueueIf>, private val config: KrawlConfig) {

val logger: Logger = LogManager.getLogger()

private var popSelector: AtomicInteger = AtomicInteger(0)
private val pushSelector: AtomicInteger = AtomicInteger(0)
private var popSelector: Int = 0
private var pushSelector: Int = 0

private val pushAffinityCache: LoadingCache<String, Int> = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterAccess(10, TimeUnit.MINUTES)
.build(
object : CacheLoader<String, Int>() {
override fun load(key: String): Int {
return pushSelector.incrementAndGet() % queues.size
return pushSelector % queues.size
}
}
)
Expand All @@ -55,7 +53,7 @@ class ScheduledQueue(private val queues: List<KrawlQueueIf>, private val config:
*
* @return [List]: List of KrawlQueueEntries that were pushed
*/
suspend fun push(referringDomain: String, entries: List<KrawlQueueEntry>): List<KrawlQueueEntry> {
fun push(referringDomain: String, entries: List<KrawlQueueEntry>): List<KrawlQueueEntry> {
val affinity = pushAffinityCache[referringDomain]
return queues[affinity].push(entries)
}
Expand All @@ -66,25 +64,25 @@ class ScheduledQueue(private val queues: List<KrawlQueueIf>, private val config:
*
* @return [KrawlQueueEntry?]: A single KrawlQueueEntry if available, null otherwise
*/
suspend fun pop(): KrawlQueueEntry? {
fun pop(): KrawlQueueEntry? {
var emptyQueueWaitCount: Long = 0

// This should only be 0 in the case of testing, so this is kind of a hack
val modVal = if (pushSelector.get() > queues.size || pushSelector.get() == 0)
val modVal = if (pushSelector > queues.size || pushSelector == 0)
queues.size
else
pushSelector.get()
pushSelector

var entry: KrawlQueueEntry? = queues[popSelector.incrementAndGet() % modVal].pop()
var entry: KrawlQueueEntry? = queues[popSelector % modVal].pop()

// Multiply by queue size, we'll check all of the queues each second
while (entry == null && emptyQueueWaitCount < (config.emptyQueueWaitTime * modVal)) {
logger.debug("Waiting on an entry... Wait count: ${emptyQueueWaitCount / modVal}")
// Wait for the configured period for more URLs
delay(Math.ceil(1000.0 / modVal).toLong())
Thread.sleep(Math.ceil(1000.0 / modVal).toLong())
emptyQueueWaitCount++

entry = queues[popSelector.incrementAndGet() % modVal].pop()
entry = queues[popSelector % modVal].pop()
}

return entry
Expand Down
125 changes: 76 additions & 49 deletions src/main/kotlin/io/thelandscape/krawler/crawler/Krawler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import io.thelandscape.krawler.robots.RoboMinder
import io.thelandscape.krawler.robots.RoboMinderIf
import io.thelandscape.krawler.robots.RobotsConfig
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

/**
Expand Down Expand Up @@ -201,7 +201,9 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
}

onCrawlStart()
(1..100).map { schedule() }
val urls: ProducerJob<KrawlQueueEntry> = produceUrls()
val actions: ProducerJob<KrawlAction> = produceKrawlActions(urls)
doCrawl(actions)
job.join()
}

Expand Down Expand Up @@ -230,7 +232,10 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
}

onCrawlStart()
(1..100).map { schedule() }
val urls: ProducerJob<KrawlQueueEntry> = produceUrls()
val actions: ProducerJob<KrawlAction> = produceKrawlActions(urls)
doCrawl(actions)
onCrawlEnd()
}


Expand All @@ -253,41 +258,69 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
* Private members
*/

internal fun schedule() = launch(CommonPool + job) {
try { doCrawl() }
catch(e: Throwable) {
logger.debug(e.printStackTrace())
visitCount.decrementAndGet()
}
sealed class KrawlAction {
data class Visit(val krawlUrl: KrawlUrl, val doc: KrawlDocument): KrawlAction()
data class Check(val krawlUrl: KrawlUrl, val statusCode: Int): KrawlAction()
class Noop: KrawlAction()
}


internal val visitCount: AtomicInteger = AtomicInteger(0)
internal val finishedCount: AtomicInteger = AtomicInteger(0)
internal val wasShutdown: AtomicBoolean = AtomicBoolean(false)

// Set of redirect codes
private val redirectCodes: Set<Int> = setOf(300, 301, 302, 303, 307, 308)
internal suspend fun doCrawl() {
suspend fun produceUrls(): ProducerJob<KrawlQueueEntry> = produce(CommonPool + job) {
while(true) {
val entry: KrawlQueueEntry? = scheduledQueue.pop()

if (entry == null) {
channel.close()
logger.debug("Closing produceUrls")
break
}

logger.debug("Sending ${entry.url}")
send(entry)
}
}

suspend fun produceKrawlActions(entries: ReceiveChannel<KrawlQueueEntry>): ProducerJob<KrawlAction>
= produce(CommonPool + job) {

var visited: Int = 0

entries.consumeEach { (url, parent, depth) ->
if (visited == config.totalPages && config.totalPages > 0) {
logger.debug("Closing produceKrawlActions")
channel.close()
stop()
}

val entry: KrawlQueueEntry = scheduledQueue.pop() ?: return
val krawlUrl: KrawlUrl = KrawlUrl.new(url)
val parentKrawlUrl: KrawlUrl = KrawlUrl.new(parent.url)

val krawlUrl: KrawlUrl = KrawlUrl.new(entry.url)
val depth: Int = entry.depth
val parent: KrawlUrl = KrawlUrl.new(entry.parent.url)
val action: KrawlAction = fetch(krawlUrl, depth, parentKrawlUrl).await()

if (action !is KrawlAction.Noop)
visited++

send(action)
}
}

fun fetch(krawlUrl: KrawlUrl, depth: Int, parent: KrawlUrl): Deferred<KrawlAction> = async(CommonPool + job) {

// Make sure we're within depth limit
if (depth >= config.maxDepth && config.maxDepth != -1) {

logger.debug("Max depth!")
return
return@async KrawlAction.Noop()
}

// Do a history check
val history: KrawlHistoryEntry =
if (krawlHistory!!.hasBeenSeen(krawlUrl)) { // If it has been seen
onRepeatVisit(krawlUrl, parent)
schedule()
logger.debug("History says no")
return
return@async KrawlAction.Noop()
} else {
krawlHistory!!.insert(krawlUrl)
}
Expand All @@ -299,57 +332,51 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
if (visit || check) {
// If we're respecting robots.txt check if it's ok to visit this page
if (config.respectRobotsTxt && !minder.isSafeToVisit(krawlUrl)) {
schedule()
logger.debug("Robots says no")
return
return@async KrawlAction.Noop()
}

if ((visitCount.incrementAndGet() > config.totalPages) && (config.totalPages > -1)) {
logger.debug("Max visit limit reached")
return
val doc: RequestResponse = if (visit) {
requestProvider.getUrl(krawlUrl).await()
} else {
requestProvider.checkUrl(krawlUrl).await()
}

val doc: RequestResponse = requestProvider.getUrl(krawlUrl).await()

// If there was an error on trying to get the doc, call content fetch error
if (doc is ErrorResponse) {
onContentFetchError(krawlUrl, doc.reason)
schedule()
logger.debug("Content fetch error!")
return
return@async KrawlAction.Noop()
}

// If there was an error parsing the response, still a content fetch error
if (doc !is KrawlDocument) {
onContentFetchError(krawlUrl, "Krawler was unable to parse the response from the server.")
schedule()
logger.debug("content fetch error2")
return
return@async KrawlAction.Noop()
}

val links = harvestLinks(doc, krawlUrl, history, depth)
scheduledQueue.push(krawlUrl.domain, links)

// Finally call visit
if (visit)
visit(krawlUrl, doc)

if (check)
check(krawlUrl, doc.statusCode)

if ((finishedCount.incrementAndGet() == config.totalPages) && (config.totalPages > -1)) {
logger.debug("cancelling jobs")
if (!wasShutdown.getAndSet(true))
try {
job.cancel()
} catch (e: CancellationException) {
// Do nothing
}
return
}
return@async KrawlAction.Visit(krawlUrl, doc)
else
return@async KrawlAction.Check(krawlUrl, doc.statusCode)
}

schedule()
return@async KrawlAction.Noop()
}

// Set of redirect codes
private val redirectCodes: Set<Int> = setOf(300, 301, 302, 303, 307, 308)
internal suspend fun doCrawl(channel: ReceiveChannel<KrawlAction>) {
channel.consumeEach { action ->
when(action) {
is KrawlAction.Visit -> launch(CommonPool + job) { visit(action.krawlUrl, action.doc) }
is KrawlAction.Check -> launch(CommonPool + job) { check(action.krawlUrl, action.statusCode) }
}
}
}

/**
Expand Down
7 changes: 3 additions & 4 deletions src/main/kotlin/io/thelandscape/krawler/http/Requests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package io.thelandscape.krawler.http

import io.thelandscape.krawler.crawler.KrawlConfig
import io.thelandscape.krawler.robots.RobotsTxt
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.sync.Mutex
import org.apache.http.HttpRequest
import org.apache.http.HttpResponse
Expand All @@ -40,9 +42,6 @@ import org.apache.http.protocol.HttpContext
import org.apache.http.ssl.SSLContextBuilder
import java.security.cert.X509Certificate
import java.time.Instant
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.write

interface RequestProviderIf {
/**
Expand Down

0 comments on commit d1dcf8f

Please sign in to comment.