Skip to content

Commit

Permalink
Adding queue submission/removal; priorities to queue entries.
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmadden committed Nov 21, 2017
1 parent 106e95d commit 47c588d
Show file tree
Hide file tree
Showing 14 changed files with 235 additions and 72 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ to use Krawler in your project:
maven { url "https://jitpack.io" }
}
dependencies {
compile 'com.github.brianmadden:krawler:0.4.2'
compile 'com.github.brianmadden:krawler:0.4.3'
}
```
Expand Down Expand Up @@ -98,6 +98,14 @@ Roadmap

Release Notes
=============
**0.4.3 (2017-11-20)**
- Added ability to clear crawl queues by RequestId and Age, see `Krawler#removeUrlsByRootPage`
and `Krawler#removeUrlsByAge`
- Added config option to prevent crawler shutdown on empty queues
- Added new single byte priority field to `KrawlQueueEntry`. Queues will always attempt to pop the `lowest` priority
entry available. Priority can be assigned by overriding the `Krawler#assignQueuePriorty` method.
- Update dependencies

**0.4.2 (2017-10-25)**
- Updated to Kotlin Runtime 1.1.51, kotlinx-coroutines 0.19.2
- Reworked KrawlUrl class internals to handle spaces in URLs better which should result in
Expand Down
20 changes: 11 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
group 'io.thelandscape'
version '0.4.1'
version '0.4.3'

buildscript {
ext.kotlin_version = '1.1.51'
ext.kotlin_version = '1.1.60'

repositories {
mavenCentral()
Expand Down Expand Up @@ -35,7 +35,7 @@ project(":") {
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.19.2'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.19.3'

compile "org.apache.httpcomponents:httpclient:4.5.2"
compile group: 'org.hsqldb', name: 'hsqldb', version: '2.3.4'
Expand All @@ -44,15 +44,17 @@ project(":") {

compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.8.1'

compile "com.github.andrewoma.kwery:core:0.15"
compile "com.github.andrewoma.kwery:fetcher:0.15"
compile "com.github.andrewoma.kwery:mapper:0.15"
compile "com.github.andrewoma.kwery:core:0.17"
compile "com.github.andrewoma.kwery:fetcher:0.17"
compile "com.github.andrewoma.kwery:mapper:0.17"

compile group: 'com.zaxxer', name: 'HikariCP', version: '2.6.3'
compile group: 'com.zaxxer', name: 'HikariCP', version: '2.7.3'

compile group: 'com.google.guava', name: 'guava', version: '19.0'
compile group: 'com.google.guava', name: 'guava', version: '23.4-jre'

testCompile "com.nhaarman:mockito-kotlin:1.4.0"
testCompile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.8.1'
testCompile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: '2.8.1'
testCompile "com.nhaarman:mockito-kotlin:1.5.0"
testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
testCompile "junit:junit:4.12"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class KrawlConfig(
// Maximum number of queued URLs - when this value is exceeded
// additional crawl requests are rejected until the queue has drained
val maximumQueueSize: Int = 1000000,
// Shutdown on empty queues?
val shutdownOnEmptyQueue: Boolean = true,
// Length of time (in seconds) to wait before giving up and calling it quits
emptyQueueWaitTime: Long = 10,
// The timeout in milliseconds used when requesting a connection. 0 = infinite, -1 = system default
Expand Down
25 changes: 21 additions & 4 deletions src/main/kotlin/io/thelandscape/krawler/crawler/KrawlQueue/Dao.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import com.github.andrewoma.kwery.mapper.*
import com.github.andrewoma.kwery.mapper.util.camelToLowerUnderscore
import io.thelandscape.krawler.crawler.History.KrawlHistoryEntry
import io.thelandscape.krawler.crawler.History.KrawlHistoryHSQLDao
import java.sql.Timestamp
import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger

object historyConverter :
SimpleConverter<KrawlHistoryEntry>( { row, c -> KrawlHistoryEntry(row.long(c)) }, KrawlHistoryEntry::id)
Expand All @@ -36,28 +39,33 @@ class KrawlQueueTable(name: String) : Table<KrawlQueueEntry, String>(name,
standardConverters + timeConverters + reifiedConverter(historyConverter), camelToLowerUnderscore)) {

val Url by col(KrawlQueueEntry::url, id = true)
val RootPageId by col(KrawlQueueEntry::rootPageId)
val Parent by col (KrawlQueueEntry::parent)
val Depth by col(KrawlQueueEntry::depth)
val Priority by col(KrawlQueueEntry::priority)
val Timestamp by col(KrawlQueueEntry::timestamp)

override fun idColumns(id: String) = setOf(Url of id)

override fun create(value: Value<KrawlQueueEntry>) =
KrawlQueueEntry(value of Url, value of Parent, value of Depth, value of Timestamp)
KrawlQueueEntry(value of Url, value of RootPageId, value of Parent,
value of Depth, value of Priority, value of Timestamp)

}

// TODO: Figure out how to allow this to take a generic KrawlHistoryIf
// rather than an HSQLDao while keeping the interface clean
class KrawlQueueHSQLDao(name: String,
session: Session,
private val histDao: KrawlHistoryHSQLDao):
private val histDao: KrawlHistoryHSQLDao = KrawlHistoryHSQLDao(session)):
KrawlQueueIf, AbstractDao<KrawlQueueEntry, String>(session, KrawlQueueTable(name), KrawlQueueEntry::url) {

init {
// Create queue table
session.update("CREATE TABLE IF NOT EXISTS $name " +
"(url VARCHAR(2048) NOT NULL, parent INT, depth INT, timestamp TIMESTAMP)")
"(url VARCHAR(2048) NOT NULL, root_page_id INT, parent INT, depth INT, priority TINYINT, timestamp TIMESTAMP)")

session.update("CREATE INDEX ${name}PriorityIdx ON $name (priority)")
}

override fun pop(): KrawlQueueEntry? {
Expand All @@ -71,7 +79,7 @@ class KrawlQueueHSQLDao(name: String,

fun <T> Collection<T>.fetch(node: Node) = fetcher.fetch(this, Node(node))

val selectSql = "SELECT TOP 1 $columns FROM ${table.name}"
val selectSql = "SELECT TOP 1 $columns FROM ${table.name} WHERE priority = (SELECT MIN(priority) FROM ${table.name})"

// No need to synchronize here, current implementation only uses a single thread to pop URLs
val out: List<KrawlQueueEntry> = session.select(selectSql, mapper = table.rowMapper())
Expand All @@ -87,4 +95,13 @@ class KrawlQueueHSQLDao(name: String,

return listOf()
}

override fun deleteByRootPageId(rootPageId: Int): Int {
return session.update("DELETE FROM ${table.name} WHERE root_page_id = :id", mapOf("id" to rootPageId))
}

override fun deleteByAge(beforeTime: LocalDateTime): Int {
return session.update("DELETE FROM ${table.name} WHERE timestamp < :beforeTime",
mapOf("beforeTime" to Timestamp.valueOf(beforeTime)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@ package io.thelandscape.krawler.crawler.KrawlQueue
import io.thelandscape.krawler.crawler.History.KrawlHistoryEntry
import java.time.LocalDateTime

data class KrawlQueueEntry(val url: String,
val parent: KrawlHistoryEntry = KrawlHistoryEntry(),
val depth: Int = 0,
val timestamp: LocalDateTime = LocalDateTime.now())
data class KrawlQueueEntry(
// The target crawl URL
val url: String,
// The ID of the start URL that lead to this page
val rootPageId: Int,
// Direct parent of this page in the crawl
val parent: KrawlHistoryEntry = KrawlHistoryEntry(),
// Depth of the entry
val depth: Int = 0,
// Priority
val priority: Byte = 0,
// Timestamp entry was added to the queue
val timestamp: LocalDateTime = LocalDateTime.now())
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@

package io.thelandscape.krawler.crawler.KrawlQueue

import java.time.LocalDateTime

/**
* Interface representing a KrawlQueue
*/
interface KrawlQueueIf {
// Pop an entry from the queue
fun pop (): KrawlQueueEntry?
// Push an entry on the queue
fun push (urls: List<KrawlQueueEntry>): List<KrawlQueueEntry>
// Delete all entries with specific root page ID
fun deleteByRootPageId (rootPageId: Int): Int
// Delete all entries older than beforeTime
fun deleteByAge(beforeTime: LocalDateTime): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import com.google.common.cache.LoadingCache
import io.thelandscape.krawler.crawler.KrawlConfig
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.channels.ProducerJob
import kotlinx.coroutines.experimental.channels.produce
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.time.LocalDateTime
import java.util.concurrent.TimeUnit

class ScheduledQueue(private val queues: List<KrawlQueueIf>,
Expand Down Expand Up @@ -95,4 +94,22 @@ class ScheduledQueue(private val queues: List<KrawlQueueIf>,
krawlQueueEntryChannel.send(entry)
}
}

/**
* Removes all queue entries with specified rootPageId
*
* @param rootPageId Int: The id of the root page
*
* @return the number of entries removed
*/
fun deleteByRootPageId(rootPageId: Int): Int = queues.map { it.deleteByRootPageId(rootPageId) }.sum()

/**
* Removes all queue entries that are older than timestamp.
*
* @param timestamp LocalDateTime: Delete all entries prior to this timestamp
*
* @return the number of entries removed
*/
fun deleteByAge(beforeTime: LocalDateTime): Int = queues.map { it.deleteByAge(beforeTime) }.sum()
}
27 changes: 0 additions & 27 deletions src/main/kotlin/io/thelandscape/krawler/crawler/KrawlQueueEntry.kt

This file was deleted.

Loading

0 comments on commit 47c588d

Please sign in to comment.