Skip to content

Commit

Permalink
Working pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmadden committed May 13, 2017
1 parent 5a47193 commit 1bd18fd
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 68 deletions.
2 changes: 1 addition & 1 deletion example/src/main/kotlin/main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.thelandscape.krawler.crawler.KrawlConfig

fun main(args: Array<String>) {

val config: KrawlConfig = KrawlConfig(totalPages = 100)
val config: KrawlConfig = KrawlConfig(totalPages = 1000)
val k = SimpleExample(config)

// Add a few different hosts to the whitelist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,18 @@ class ScheduledQueue(private val queues: List<KrawlQueueIf>,
private val config: KrawlConfig,
private val jobContext: Job) {

val logger: Logger = LogManager.getLogger()

private val logger: Logger = LogManager.getLogger()

val krawlQueueEntryChannel: Channel<KrawlQueueEntry> = Channel<KrawlQueueEntry>()

init {
repeat(queues.size) {
launch(CommonPool + jobContext) {
pop(it)
}
}
}

private var pushSelector: Int = 0

private val pushAffinityCache: LoadingCache<String, Int> = CacheBuilder.newBuilder()
Expand Down Expand Up @@ -64,43 +74,25 @@ class ScheduledQueue(private val queues: List<KrawlQueueIf>,
}

/**
* Pops a KrawlQueueEntry from the first queue with an entry available. This method
* will rotate through the queues in round robin fashion to try to increase parallelism.
* Pops a KrawlQueueEntry from a KrawlQueue.
*
* @return [KrawlQueueEntry?]: A single KrawlQueueEntry if available, null otherwise
* @return [KrawlQueueEntry]: A single KrawlQueueEntry
*/
suspend fun pop(index: Int, channel: Channel<KrawlQueueEntry>) {
suspend fun pop(index: Int) {
var emptyQueueWaitCount: Int = 0

while(true) {
while(true) {
logger.debug("Popping w/ queue selector: $index")
var emptyQueueWaitCount: Long = 0

var entry: KrawlQueueEntry? = queues[index].pop()
while (entry == null && emptyQueueWaitCount < (config.emptyQueueWaitTime * index)) {
// Wait for the configured period for more URLs
var entry: KrawlQueueEntry? = queues[index].pop()

while (entry == null) {
logger.debug("Delaying queue:$index for 1000...")
delay(1000)
emptyQueueWaitCount++

entry = queues[index].pop()
}

if (entry == null) {
channel.close()
return
}

channel.send(entry)

krawlQueueEntryChannel.send(entry)
}
}

fun produceKrawlQueueEntries(): Channel<KrawlQueueEntry> {

val channel: Channel<KrawlQueueEntry> = Channel()

repeat(queues.size) {
launch(CommonPool + jobContext) { pop(it, channel) }
}

return channel
}
}
}
68 changes: 36 additions & 32 deletions src/main/kotlin/io/thelandscape/krawler/crawler/Krawler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
* @param: seedUrl List<String>: A list of seed URLs
*
*/
fun start(seedUrl: List<String>) = runBlocking(CommonPool) {
fun start(seedUrl: List<String>, blocking: Boolean = true) = runBlocking(CommonPool) {
// Convert all URLs to KrawlUrls
val krawlUrls: List<KrawlUrl> = seedUrl.map { KrawlUrl.new(it) }

Expand All @@ -201,10 +201,16 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
}

onCrawlStart()
val urls: Channel<KrawlQueueEntry> = scheduledQueue.produceKrawlQueueEntries()
val actions: ProducerJob<KrawlAction> = produceKrawlActions(urls)
doCrawl(actions)
job.join()
val urls: Channel<KrawlQueueEntry> = scheduledQueue.krawlQueueEntryChannel
repeat(krawlQueues!!.size) {
launch(CommonPool + job) {
val actions: ProducerJob<KrawlAction> = produceKrawlActions(urls)
doCrawl(actions)
}
}

if (blocking)
job.join()
}

/**
Expand All @@ -224,18 +230,7 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
* @param: seedUrl List<String>: A list of seed URLs
*/
fun startNonblocking(seedUrl: List<String>) = runBlocking(CommonPool) {
// Convert all URLs to KrawlUrls
val krawlUrls: List<KrawlUrl> = seedUrl.map { KrawlUrl.new(it) }

krawlUrls.forEach {
scheduledQueue.push(it.domain, listOf(KrawlQueueEntry(it.canonicalForm)))
}

onCrawlStart()
val urls: Channel<KrawlQueueEntry> = scheduledQueue.produceKrawlQueueEntries()
val actions: ProducerJob<KrawlAction> = produceKrawlActions(urls)
doCrawl(actions)
onCrawlEnd()
start(seedUrl, false)
}


Expand Down Expand Up @@ -264,32 +259,41 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
class Noop: KrawlAction()
}


internal val visitCount: AtomicInteger = AtomicInteger(0)
internal val finishedCount: AtomicInteger = AtomicInteger(0)

suspend fun produceKrawlActions(entries: ReceiveChannel<KrawlQueueEntry>): ProducerJob<KrawlAction>
= produce(CommonPool + job) {

var visited: Int = 0

entries.consumeEach { (url, parent, depth) ->
if (visited == config.totalPages && config.totalPages > 0) {
logger.debug("Closing produceKrawlActions")
channel.close()
stop()
}

while(true) {
// This is where we'll die bomb out if we don't receive an entry after some time
var timeoutCounter: Long = 0
while(entries.isEmpty) {
if (timeoutCounter++ == config.emptyQueueWaitTime) {
logger.debug("Closing channel after timeout reached")
channel.close()
}
delay(1000)
}

val (url, parent, depth) = entries.receive()

val krawlUrl: KrawlUrl = KrawlUrl.new(url)
val parentKrawlUrl: KrawlUrl = KrawlUrl.new(parent.url)

val action: KrawlAction = fetch(krawlUrl, depth, parentKrawlUrl).await()

if (action !is KrawlAction.Noop)
visited++

if (action !is KrawlAction.Noop) {
if (visitCount.getAndIncrement() == config.totalPages && config.totalPages > 0) {
logger.debug("Closing produceKrawlActions")
channel.close()
job.cancel()
return@produce
}
}

send(action)
}
}
}

fun fetch(krawlUrl: KrawlUrl, depth: Int, parent: KrawlUrl): Deferred<KrawlAction> = async(CommonPool + job) {
Expand Down Expand Up @@ -405,4 +409,4 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
.map { KrawlQueueEntry(it.canonicalForm, history, depth + 1)}
).flatten()
}
}
}
10 changes: 8 additions & 2 deletions src/main/kotlin/io/thelandscape/krawler/http/Requests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import org.apache.http.protocol.HttpContext
import org.apache.http.ssl.SSLContextBuilder
import java.security.cert.X509Certificate
import java.time.Instant
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger

interface RequestProviderIf {
/**
Expand Down Expand Up @@ -82,6 +84,8 @@ private val pcm: PoolingHttpClientConnectionManager = PoolingHttpClientConnectio
class Requests(private val krawlConfig: KrawlConfig,
private var httpClient: CloseableHttpClient? = null) : RequestProviderIf {

private val logger: Logger = LogManager.getLogger()

init {
if (httpClient == null) {
val requestConfig = RequestConfig.custom()
Expand Down Expand Up @@ -167,9 +171,11 @@ class Requests(private val krawlConfig: KrawlConfig,
myLock.lock()
try {
val reqDelta = Instant.now().toEpochMilli() - requestTracker.getTimestamp(host)
if (reqDelta >= 0 && reqDelta < krawlConfig.politenessDelay)
// Sleep until the remainder of the politeness delay has elapsed
if (reqDelta >= 0 && reqDelta < krawlConfig.politenessDelay) {
// Sleep until the remainder of the politeness delay has elapsed
logger.debug("Sleeping for ${krawlConfig.politenessDelay - reqDelta} ms for politeness.")
Thread.sleep(krawlConfig.politenessDelay - reqDelta)
}
// Set last request time for politeness
requestTracker.setTimestamp(host, Instant.now().toEpochMilli())
} finally {
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
<AppenderRef ref="RollingFile"/>
</Root>
</Loggers>
</Configuration>
</Configuration>

0 comments on commit 1bd18fd

Please sign in to comment.