Skip to content

Commit

Permalink
Merge branch 'master' into concurrent-message-processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Nevon authored Apr 8, 2019
2 parents 27ae9dc + fc6681b commit 92afb5d
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 12 deletions.
15 changes: 8 additions & 7 deletions docs/Admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ delete.topic.enable=true

## <a name="get-topic-metadata"></a> Get topic metadata

Deprecated, see [Fetch topic metadata](#fetch-topic-metadata)

## <a name="fetch-topic-metadata"></a> Fetch topic metadata

```javascript
await admin.getTopicMetadata({ topics: <Array<String> })
await admin.fetchTopicMetadata({ topics: <Array<String> })
```

`TopicsMetadata` structure:
Expand Down Expand Up @@ -102,15 +106,12 @@ await admin.getTopicMetadata({ topics: <Array<String> })

The admin client will throw an exception if any of the provided topics do not already exist.

If you omit the `topics` argument the admin client will fetch metadata for all topics
of which it is already aware (all the cluster's target topics):
If you omit the `topics` argument the admin client will fetch metadata for all topics:

```javascript
await admin.getTopicMetadata()
await admin.fetchTopicMetadata()
```



## <a name="fetch-topic-offsets"></a> Fetch topic offsets

`fetchTopicOffsets` returns most recent offset for a topic.
Expand Down Expand Up @@ -317,4 +318,4 @@ Example response:
}],
throttleTime: 0,
}
```
```
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"lint-staged": "^6.0.0",
"mockdate": "^2.0.2",
"prettier": "^1.15.2",
"semver": "^6.0.0",
"uuid": "^3.3.2"
},
"dependencies": {
Expand Down
93 changes: 93 additions & 0 deletions src/admin/__tests__/fetchTopicMetadata.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
const createAdmin = require('../index')

const { secureRandom, createCluster, newLogger, createTopic } = require('testHelpers')

describe('Admin', () => {
let existingTopicName, numPartitions, admin, consumer

beforeEach(async () => {
existingTopicName = `test-topic-${secureRandom()}`
numPartitions = 4

await createTopic({ topic: existingTopicName, partitions: numPartitions })
})

afterEach(async () => {
await admin.disconnect()
consumer && (await consumer.disconnect())
})

describe('fetchTopicMetadata', () => {
test('throws an error if the topic name is not a valid string', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
await expect(admin.fetchTopicMetadata({ topics: [null] })).rejects.toHaveProperty(
'message',
'Invalid topic null'
)
})

test('retrieves metadata for each partition in the topic', async () => {
const cluster = createCluster()
admin = createAdmin({ cluster, logger: newLogger() })

await admin.connect()
const { topics: topicsMetadata } = await admin.fetchTopicMetadata({
topics: [existingTopicName],
})

expect(topicsMetadata).toHaveLength(1)
const topicMetadata = topicsMetadata[0]
expect(topicMetadata).toHaveProperty('name', existingTopicName)
expect(topicMetadata.partitions).toHaveLength(numPartitions)

topicMetadata.partitions.forEach(partition => {
expect(partition).toHaveProperty('partitionId')
expect(partition).toHaveProperty('leader')
expect(partition).toHaveProperty('replicas')
expect(partition).toHaveProperty('partitionErrorCode')
expect(partition).toHaveProperty('isr')
})
})

test('by default retrieves metadata for all topics', async () => {
const cluster = createCluster()
admin = createAdmin({ cluster, logger: newLogger() })

await admin.connect()
const { topics } = await admin.fetchTopicMetadata()
expect(topics.length).toBeGreaterThanOrEqual(1)
})

test('creates a new topic if the topic does not exist and "allowAutoTopicCreation" is true', async () => {
admin = createAdmin({
cluster: createCluster({ allowAutoTopicCreation: true }),
logger: newLogger(),
})
const newTopicName = `test-topic-${secureRandom()}`

await admin.connect()
const { topics: topicsMetadata } = await admin.fetchTopicMetadata({
topics: [existingTopicName, newTopicName],
})

expect(topicsMetadata[0]).toHaveProperty('name', existingTopicName)
expect(topicsMetadata[1]).toHaveProperty('name', newTopicName)
expect(topicsMetadata[1].partitions).toHaveLength(1)
})

test('throws an error if the topic does not exist and "allowAutoTopicCreation" is false', async () => {
admin = createAdmin({
cluster: createCluster({ allowAutoTopicCreation: false }),
logger: newLogger(),
})
const newTopicName = `test-topic-${secureRandom()}`

await admin.connect()
await expect(
admin.fetchTopicMetadata({
topics: [existingTopicName, newTopicName],
})
).rejects.toHaveProperty('message', 'This server does not host this topic-partition')
})
})
})
52 changes: 51 additions & 1 deletion src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ module.exports = ({

await cluster.refreshMetadata()
} catch (e) {
if (e.type === 'NOT_CONTROLLER') {
if (['NOT_CONTROLLER', 'UNKNOWN_TOPIC_OR_PARTITION'].includes(e.type)) {
logger.warn('Could not delete topics', { error: e.message, retryCount, retryTime })
throw e
}
Expand Down Expand Up @@ -469,6 +469,10 @@ module.exports = ({
}

/**
* @deprecated - This method was replaced by `fetchTopicMetadata`. This implementation
* is limited by the topics in the target group, so it can't fetch all topics when
* necessary.
*
* Fetch metadata for provided topics.
*
* If no topics are provided fetch metadata for all topics of which we are aware.
Expand Down Expand Up @@ -525,6 +529,51 @@ module.exports = ({
}
}

/**
* Fetch metadata for provided topics.
*
* If no topics are provided fetch metadata for all topics.
* @see https://kafka.apache.org/protocol#The_Messages_Metadata
*
* @param {Object} [options]
* @param {string[]} [options.topics]
* @return {Promise<TopicsMetadata>}
*
* @typedef {Object} TopicsMetadata
* @property {Array<TopicMetadata>} topics
*
* @typedef {Object} TopicMetadata
* @property {String} name
* @property {Array<PartitionMetadata>} partitions
*
* @typedef {Object} PartitionMetadata
* @property {number} partitionErrorCode Response error code
* @property {number} partitionId Topic partition id
* @property {number} leader The id of the broker acting as leader for this partition.
* @property {Array<number>} replicas The set of all nodes that host this partition.
* @property {Array<number>} isr The set of nodes that are in sync with the leader for this partition.
*/
const fetchTopicMetadata = async ({ topics = [] } = {}) => {
if (topics) {
await Promise.all(
topics.map(async topic => {
if (!topic) {
throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`)
}
})
)
}

const metadata = await cluster.metadata({ topics })

return {
topics: metadata.topicMetadata.map(topicMetadata => ({
name: topicMetadata.topic,
partitions: topicMetadata.partitionMetadata,
})),
}
}

/**
* @param {string} eventName
* @param {Function} listener
Expand Down Expand Up @@ -557,6 +606,7 @@ module.exports = ({
createTopics,
deleteTopics,
getTopicMetadata,
fetchTopicMetadata,
events,
fetchOffsets,
fetchTopicOffsets,
Expand Down
20 changes: 20 additions & 0 deletions src/broker/__tests__/metadata.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ describe('Broker > Metadata', () => {
})
})

test('can fetch metatada for all topics', async () => {
await broker.connect()
await createTopic({ topic: topicName })
await createTopic({ topic: `test-topic-${secureRandom()}` })

let response = await retryProtocol(
'LEADER_NOT_AVAILABLE',
async () => await broker.metadata([])
)

expect(response.topicMetadata.length).toBeGreaterThanOrEqual(2)

response = await retryProtocol(
'LEADER_NOT_AVAILABLE',
async () => await broker.metadata([topicName])
)

expect(response.topicMetadata.length).toEqual(1)
})

describe('when allowAutoTopicCreation is disabled and the topic does not exist', () => {
beforeEach(() => {
topicName = `test-topic-${secureRandom()}`
Expand Down
38 changes: 38 additions & 0 deletions src/cluster/__tests__/metadata.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
const { createCluster, secureRandom, createTopic } = require('testHelpers')

describe('Cluster > metadata', () => {
let cluster, topic1, topic2, topic3

beforeEach(async () => {
topic1 = `test-topic-${secureRandom()}`
topic2 = `test-topic-${secureRandom()}`
topic3 = `test-topic-${secureRandom()}`

await createTopic({ topic: topic1 })
await createTopic({ topic: topic2 })
await createTopic({ topic: topic3 })

cluster = createCluster()
await cluster.connect()
})

afterEach(async () => {
cluster && (await cluster.disconnect())
})

test('returns metadata for a set of topics', async () => {
const response = await cluster.metadata({ topics: [topic1, topic2] })
expect(response.topicMetadata.length).toEqual(2)

const topics = response.topicMetadata.map(({ topic }) => topic).sort()
expect(topics).toEqual([topic1, topic2].sort())
})

test('returns metadata for all topics', async () => {
const response = await cluster.metadata({ topics: [] })
expect(response.topicMetadata.length).toBeGreaterThanOrEqual(3)

const topics = response.topicMetadata.map(({ topic }) => topic).sort()
expect(topics).toEqual(expect.arrayContaining([topic1, topic2, topic3].sort()))
})
})
19 changes: 19 additions & 0 deletions src/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,25 @@ module.exports = class Cluster {
await this.brokerPool.refreshMetadataIfNecessary(Array.from(this.targetTopics))
}

/**
* @public
* @returns {Promise<Metadata>}
*/
async metadata({ topics = [] } = {}) {
return this.retrier(async (bail, retryCount, retryTime) => {
try {
await this.brokerPool.refreshMetadataIfNecessary(topics)
return this.brokerPool.withBroker(async ({ broker }) => broker.metadata(topics))
} catch (e) {
if (e.type === 'LEADER_NOT_AVAILABLE') {
throw e
}

bail(e)
}
})
}

/**
* @public
* @param {string} topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ describe('Consumer > OffsetMananger > commitOffsetsIfNecessary', () => {
})

await offsetManager.commitOffsetsIfNecessary()
await sleep(30)
await sleep(50)

await offsetManager.commitOffsetsIfNecessary()
await offsetManager.commitOffsetsIfNecessary()
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/requests/metadata/v4/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ module.exports = ({ topics, allowAutoTopicCreation = true }) => ({
apiVersion: 4,
apiName: 'Metadata',
encode: async () => {
return new Encoder().writeArray(topics).writeBoolean(allowAutoTopicCreation)
return new Encoder().writeNullableArray(topics).writeBoolean(allowAutoTopicCreation)
},
})
4 changes: 2 additions & 2 deletions testHelpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const fs = require('fs')
const ip = require('ip')
const execa = require('execa')
const uuid = require('uuid/v4')
const semver = require('semver')
const crypto = require('crypto')
const Cluster = require('../src/cluster')
const waitFor = require('../src/utils/waitFor')
Expand Down Expand Up @@ -172,8 +173,7 @@ const addPartitions = async ({ topic, partitions }) => {
}

const testIfKafkaVersion = version => (description, callback, testFn = test) => {
const kafkaVersions = process.env.KAFKA_VERSION.split(/\s*,\s*/)
return kafkaVersions.includes(version)
return semver.gte(semver.coerce(process.env.KAFKA_VERSION), semver.coerce(version))
? testFn(description, callback)
: test.skip(description, callback)
}
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4244,6 +4244,11 @@ semver@^5.5.1:
version "5.6.0"
resolved "https://registry.yarnpkg.com/semver/-/semver-5.6.0.tgz#7e74256fbaa49c75aa7c7a205cc22799cac80004"

semver@^6.0.0:
version "6.0.0"
resolved "https://registry.yarnpkg.com/semver/-/semver-6.0.0.tgz#05e359ee571e5ad7ed641a6eec1e547ba52dea65"
integrity sha512-0UewU+9rFapKFnlbirLi3byoOuhrSsli/z/ihNnvM24vgF+8sNBiI1LZPBSH9wJKUwaUbw+s3hToDLCXkrghrQ==

set-blocking@^2.0.0, set-blocking@~2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/set-blocking/-/set-blocking-2.0.0.tgz#045f9782d011ae9a6803ddd382b24392b3d890f7"
Expand Down

0 comments on commit 92afb5d

Please sign in to comment.