Skip to content

Commit

Permalink
Progress towards cleaning up deadlocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmadden committed May 10, 2017
1 parent 73c073e commit 5ce5aef
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 83 deletions.
51 changes: 29 additions & 22 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ buildscript {

repositories {
mavenCentral()
jcenter()
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
classpath "com.github.jengelman.gradle.plugins:shadow:1.2.4"
}
}

allprojects {

apply plugin: "kotlin"
apply plugin: "java"
apply plugin: "maven"
Expand All @@ -26,26 +29,12 @@ allprojects {
url "http://dl.bintray.com/kotlin/kotlin-eap-1.1"
}
}

task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}

task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
}

artifacts {
archives sourcesJar
archives javadocJar
}
}

project(":") {
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.15'

compile "org.apache.httpcomponents:httpclient:4.5.2"
Expand All @@ -69,19 +58,37 @@ project(":") {
testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
testCompile "junit:junit:4.12"
}

task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}

task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
}

artifacts {
archives sourcesJar
archives javadocJar
}
}

project(":example") {
apply plugin: "kotlin"
apply plugin: "com.github.johnrengelman.shadow"

jar {
manifest {
attributes(
"Manifest-Version": "1.0",
"Created-By": "@brianmadden",
"Main-Class": "MainKt")
}
}

dependencies {
compile project(":")
}
}

compileKotlin {
kotlinOptions {
languageVersion = "1.1"
apiVersion = "1.1"
}
}
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Sun Mar 05 13:10:52 PST 2017
#Sun May 07 21:17:57 PDT 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.13-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.13-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.github.andrewoma.kwery.mapper.*
import com.github.andrewoma.kwery.mapper.util.camelToLowerUnderscore
import io.thelandscape.krawler.crawler.History.KrawlHistoryEntry
import io.thelandscape.krawler.crawler.History.KrawlHistoryHSQLDao
import kotlinx.coroutines.experimental.sync.Mutex
import java.util.concurrent.TimeUnit

object historyConverter :
Expand Down Expand Up @@ -61,8 +62,8 @@ class KrawlQueueHSQLDao(name: String,
"(url VARCHAR(2048) NOT NULL, parent INT, depth INT, timestamp TIMESTAMP)")
}

private val syncLock = Any()
override fun pop(): KrawlQueueEntry? {
private val syncMutex = Mutex()
override suspend fun pop(): KrawlQueueEntry? {
val historyEntry = Type(KrawlHistoryEntry::id, { histDao.findByIds(it) })
val queueEntry = Type(
KrawlQueueEntry::url,
Expand All @@ -76,16 +77,19 @@ class KrawlQueueHSQLDao(name: String,
val selectSql = "SELECT TOP 1 $columns FROM ${table.name}"
var out: List<KrawlQueueEntry> = listOf()
// Synchronize this to prevent race conditions between popping and deleting
synchronized(syncLock) {
try {
syncMutex.lock()
out = session.select(selectSql, mapper = table.rowMapper())
if (out.isNotEmpty())
session.update("DELETE FROM ${table.name} WHERE url = :id", mapOf("id" to out.first().url))
} finally {
syncMutex.unlock()
}

return out.fetch(Node.all).firstOrNull()
}

override fun push(urls: List<KrawlQueueEntry>): List<KrawlQueueEntry> {
override suspend fun push(urls: List<KrawlQueueEntry>): List<KrawlQueueEntry> {
if (urls.isNotEmpty())
return this.batchInsert(urls)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ package io.thelandscape.krawler.crawler.KrawlQueue
* Interface representing a KrawlQueue
*/
interface KrawlQueueIf {
fun pop (): KrawlQueueEntry?
fun push (urls: List<KrawlQueueEntry>): List<KrawlQueueEntry>
suspend fun pop (): KrawlQueueEntry?
suspend fun push (urls: List<KrawlQueueEntry>): List<KrawlQueueEntry>
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,26 @@ import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import io.thelandscape.krawler.crawler.KrawlConfig
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.sync.Mutex
import kotlinx.coroutines.experimental.delay
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class ScheduledQueue(private val queues: List<KrawlQueueIf>, private val config: KrawlConfig) {

val logger: Logger = LogManager.getLogger()

private val popLock: Mutex = Mutex()
private var popSelector: Int = 0

private val pushLock: Mutex = Mutex()
private var pushSelector: Int = 0
private var popSelector: AtomicInteger = AtomicInteger(0)
private val pushSelector: AtomicInteger = AtomicInteger(0)

private val pushAffinityCache: LoadingCache<String, Int> = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterAccess(10, TimeUnit.MINUTES)
.build(
object : CacheLoader<String, Int>() {
override fun load(key: String): Int {
return runBlocking(CommonPool) {
pushLock.lock()
try { pushSelector++ % queues.size }
finally {pushLock.unlock()}
}
return pushSelector.incrementAndGet() % queues.size
}
}
)
Expand Down Expand Up @@ -77,18 +70,12 @@ class ScheduledQueue(private val queues: List<KrawlQueueIf>, private val config:
var emptyQueueWaitCount: Long = 0

// This should only be 0 in the case of testing, so this is kind of a hack
val modVal = if (pushSelector > queues.size || pushSelector == 0)
val modVal = if (pushSelector.get() > queues.size || pushSelector.get() == 0)
queues.size
else
pushSelector
pushSelector.get()

// Pop a URL off the queue
popLock.lock()
var entry: KrawlQueueEntry? = try {
queues[popSelector++ % modVal].pop()
} finally {
popLock.unlock()
}
var entry: KrawlQueueEntry? = queues[popSelector.incrementAndGet() % modVal].pop()

// Multiply by queue size, we'll check all of the queues each second
while (entry == null && emptyQueueWaitCount < (config.emptyQueueWaitTime * modVal)) {
Expand All @@ -97,13 +84,7 @@ class ScheduledQueue(private val queues: List<KrawlQueueIf>, private val config:
delay(Math.ceil(1000.0 / modVal).toLong())
emptyQueueWaitCount++

// Try to pop again
popLock.lock()
entry = try {
queues[popSelector++ % modVal].pop()
} finally {
popLock.unlock()
}
entry = queues[popSelector.incrementAndGet() % modVal].pop()
}

return entry
Expand Down
8 changes: 6 additions & 2 deletions src/main/kotlin/io/thelandscape/krawler/crawler/Krawler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ import java.util.concurrent.atomic.AtomicInteger
* Class defines the operations and data structures used to perform a web crawl.
*
* @param config: A KrawlConfig object to control the limits and settings of the crawler
* @param queue: A KrawlQueueIf provider, by default this will be a HSQL backed queue defined in the Dao
* @param krawlHistory: KrawlHistoryIf provider, by default this will be a HSQL backed table
* @param krawlQueues: A KrawlQueueIf provider, by default this will be a HSQL backed queue
* @param robotsConfig: Configuration of the robots.txt management
* @param requestProvider: RequestProviderIf provider, default is Requests class
* @param job: Job context that threads will run in.
*
*/
abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
Expand Down Expand Up @@ -305,7 +309,7 @@ abstract class Krawler(val config: KrawlConfig = KrawlConfig(),
return
}

val doc: RequestResponse = requestProvider.getUrl(krawlUrl)
val doc: RequestResponse = requestProvider.getUrl(krawlUrl).await()

// If there was an error on trying to get the doc, call content fetch error
if (doc is ErrorResponse) {
Expand Down
36 changes: 17 additions & 19 deletions src/main/kotlin/io/thelandscape/krawler/http/Requests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ package io.thelandscape.krawler.http

import io.thelandscape.krawler.crawler.KrawlConfig
import io.thelandscape.krawler.robots.RobotsTxt
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.sync.Mutex
import org.apache.http.HttpRequest
import org.apache.http.HttpResponse
Expand Down Expand Up @@ -51,17 +48,17 @@ interface RequestProviderIf {
/**
* Method to check the status code of a URL
*/
fun checkUrl(url: KrawlUrl): RequestResponse
fun checkUrl(url: KrawlUrl): Deferred<RequestResponse>

/**
* Method to get the contents of a URL
*/
fun getUrl(url: KrawlUrl): RequestResponse
fun getUrl(url: KrawlUrl): Deferred<RequestResponse>

/**
* Method to get a robots.txt from a KrawlUrl
*/
fun fetchRobotsTxt(url: KrawlUrl): RequestResponse
fun fetchRobotsTxt(url: KrawlUrl): Deferred<RequestResponse>
}

internal class HistoryTrackingRedirectStrategy: DefaultRedirectStrategy() {
Expand Down Expand Up @@ -120,29 +117,29 @@ class Requests(private val krawlConfig: KrawlConfig,
/** Fetch the robots.txt file from a domain
* @param url [KrawlUrl]: The URL to fetch robots from
*
* @return [RequestResponse]: The parsed robots.txt or, or ErrorResponse on error
* @return [Deferred<RequestResponse>]: The parsed robots.txt or, or ErrorResponse on error
*/
override fun fetchRobotsTxt(url: KrawlUrl): RequestResponse {
override fun fetchRobotsTxt(url: KrawlUrl): Deferred<RequestResponse> {
val robotsRequest = KrawlUrl.new("${url.hierarchicalPart}/robots.txt")
return runBlocking(CommonPool) { makeRequest(robotsRequest, ::HttpGet, ::RobotsTxt) }
return makeRequest(robotsRequest, ::HttpGet, ::RobotsTxt)
}

/** Check a URL and return it's status code
* @param url KrawlUrl: the url to check
*
* @return [RequestResponse]: KrawlDocument containing the status code, or ErrorResponse on error
* @return [Deferred<RequestResponse>]: KrawlDocument containing the status code, or ErrorResponse on error
*/
override fun checkUrl(url: KrawlUrl): RequestResponse = runBlocking(CommonPool) {
makeRequest(url, ::HttpHead, ::KrawlDocument)
override fun checkUrl(url: KrawlUrl): Deferred<RequestResponse> {
return makeRequest(url, ::HttpHead, ::KrawlDocument)
}

/** Get the contents of a URL
* @param url KrawlUrl: the URL to get the contents of
*
* @return [RequestResponse]: The parsed HttpResponse returned by the GET request
*/
override fun getUrl(url: KrawlUrl): RequestResponse = runBlocking(CommonPool) {
makeRequest(url, ::HttpGet, ::KrawlDocument)
override fun getUrl(url: KrawlUrl): Deferred<RequestResponse> {
return makeRequest(url, ::HttpGet, ::KrawlDocument)
}

// Hash map to track requests and respect politeness
Expand All @@ -153,9 +150,10 @@ class Requests(private val krawlConfig: KrawlConfig,
* @param reqFun: Function used to construct the request
* @param retFun: Function used to construct the response object
*/
private suspend fun makeRequest(url: KrawlUrl,
reqFun: (String) -> HttpUriRequest,
retFun: (KrawlUrl, HttpResponse, HttpClientContext) -> RequestResponse): RequestResponse {
private fun makeRequest(url: KrawlUrl,
reqFun: (String) -> HttpUriRequest,
retFun: (KrawlUrl, HttpResponse, HttpClientContext) -> RequestResponse)
: Deferred<RequestResponse> = async(CommonPool) {

val httpContext = HttpClientContext()
httpContext.setAttribute("fullRedirectHistory", listOf<RedirectHistoryNode>())
Expand Down Expand Up @@ -187,7 +185,7 @@ class Requests(private val krawlConfig: KrawlConfig,
ErrorResponse(url, e.toString())
}

return resp
resp
}
}

Expand Down
11 changes: 7 additions & 4 deletions src/main/kotlin/io/thelandscape/krawler/robots/RoboMinder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.google.common.cache.LoadingCache
import io.thelandscape.krawler.http.KrawlUrl
import io.thelandscape.krawler.http.RequestProviderIf
import io.thelandscape.krawler.http.RequestResponse
import io.thelandscape.krawler.http.Requests
import kotlinx.coroutines.experimental.Deferred

interface RoboMinderIf {
fun isSafeToVisit(url: KrawlUrl): Boolean
Expand All @@ -44,18 +44,21 @@ class RoboMinder(private val userAgent: String,
.build(
object : CacheLoader<String, (String) -> Boolean>() {
override fun load(key: String): ((String) -> Boolean) {
val resp: RequestResponse = fetch(key)
return process(resp)
val resp: Deferred<RequestResponse> = fetch(key)
while(resp.isActive) { Thread.sleep(250) }
return process(resp.getCompleted())
}
}
)


internal fun fetch(host: String): RequestResponse {
internal fun fetch(host: String): Deferred<RequestResponse> {
val robotsUrl = KrawlUrl.new("$host/robots.txt")
return request.fetchRobotsTxt(robotsUrl)
}



/**
* Process the freshly fetched robots.txt. If there are no rules pertinent to us, we'll just return null
*/
Expand Down

0 comments on commit 5ce5aef

Please sign in to comment.