Skip to content

Commit

Permalink
feat(broker): [NET-978] Support partitionKey and `partitionKeyField…
Browse files Browse the repository at this point in the history
…` in MQTT plugin (streamr-dev#1447)

It is now possible to define partition using `partitionKey` or `partitionKeyField` for MQTT publishing. 

These parameters are not supports for MQTT subscribing (i.e. the behavior is similar to `websocket` plugin).

### Refactoring

Extracted partition handling logic to helper functions so that `websocket` and `mqtt` plugin can use the same utilities.
  • Loading branch information
teogeb authored Jun 5, 2023
1 parent 7dd47ef commit 5fe9bd5
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 28 deletions.
3 changes: 3 additions & 0 deletions packages/broker/plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ By default the plugin publishes and subscribes to partition `0`. If you want to
await client.publish('/foobar?partition=5', ...)
```

For publishing, it is also possible select the partition using `partitionKey`/`partitionKeyField` query parameter in the topic. See [above](#partitions) how the partition is calculated in that case.


## HTTP

At the moment, only publishing is supported over HTTP. To subscribe, use one of the other protocol plugins as they allow a continuous streaming connection.
Expand Down
31 changes: 31 additions & 0 deletions packages/broker/src/helpers/partitions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { ParsedQs } from 'qs'
import { parsePositiveInteger, parseQueryParameter } from './parser'

export class PublishPartitionDefinition {
partition?: number
partitionKey?: string
partitionKeyField?: string
}

export const parsePublishPartitionDefinition = (queryParams: ParsedQs): PublishPartitionDefinition => {
const partition = parseQueryParameter<number>('partition', queryParams, parsePositiveInteger)
const partitionKey = queryParams['partitionKey'] as string | undefined
const partitionKeyField = queryParams['partitionKeyField'] as string | undefined
const partitionDefinitions = [partition, partitionKey, partitionKeyField].filter((d) => d !== undefined)
if (partitionDefinitions.length > 1) {
throw new Error('Invalid combination of "partition", "partitionKey" and "partitionKeyField"')
}
return {
partition,
partitionKey,
partitionKeyField
}
}

export const getPartitionKey = (content: Record<string, unknown>, definition: PublishPartitionDefinition): string | undefined => {
return definition.partitionKey ?? (
definition.partitionKeyField
? (content[definition.partitionKeyField] as string)
: undefined
)
}
36 changes: 27 additions & 9 deletions packages/broker/src/plugins/mqtt/Bridge.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { StreamPartID, toStreamID, toStreamPartID } from '@streamr/protocol'
import { StreamID, StreamPartID, toStreamID, toStreamPartID } from '@streamr/protocol'
import { Logger } from '@streamr/utils'
import without from 'lodash/without'
import { MessageMetadata, StreamrClient, Subscription } from 'streamr-client'
import { PayloadFormat } from '../../helpers/PayloadFormat'
import { Message, PayloadFormat } from '../../helpers/PayloadFormat'
import { parsePositiveInteger, parseQueryAndBase, parseQueryParameter } from '../../helpers/parser'
import { PublishPartitionDefinition, getPartitionKey, parsePublishPartitionDefinition } from '../../helpers/partitions'
import { MqttServer, MqttServerListener } from './MqttServer'

const DEFAULT_PARTITION = 0
Expand Down Expand Up @@ -39,17 +40,23 @@ export class Bridge implements MqttServerListener {
}

async onMessageReceived(topic: string, payload: string, clientId: string): Promise<void> {
let message
let message: Message
let streamPart: { streamId: StreamID } & PublishPartitionDefinition
try {
message = this.payloadFormat.createMessage(payload)
streamPart = this.getPublishStreamPart(topic)
} catch (err) {
logger.warn('Unable to form message', { err, topic, clientId })
return
}
const { content, metadata } = message
try {
const publishedMessage = await this.streamrClient.publish(this.getStreamPartition(topic), content, {
const publishedMessage = await this.streamrClient.publish({
id: streamPart.streamId,
partition: streamPart.partition
}, content, {
timestamp: metadata.timestamp,
partitionKey: getPartitionKey(content, streamPart),
msgChainId: clientId
})
this.publishMessageChains.add(createMessageChainKey(publishedMessage))
Expand All @@ -60,7 +67,7 @@ export class Bridge implements MqttServerListener {

async onSubscribed(topic: string, clientId: string): Promise<void> {
logger.info('Handle client subscribe', { clientId, topic })
const streamPart = this.getStreamPartition(topic)
const streamPart = this.getSubscribeStreamPart(topic)
const existingSubscription = this.getSubscription(streamPart)
if (existingSubscription === undefined) {
const streamrClientSubscription = await this.streamrClient.subscribe(streamPart, (content: any, metadata: MessageMetadata) => {
Expand Down Expand Up @@ -105,7 +112,7 @@ export class Bridge implements MqttServerListener {

onUnsubscribed(topic: string, clientId: string): void {
logger.info('Handle client unsubscribe', { clientId, topic })
const streamPart = this.getStreamPartition(topic)
const streamPart = this.getSubscribeStreamPart(topic)
const existingSubscription = this.getSubscription(streamPart)
if (existingSubscription !== undefined) {
existingSubscription.clientIds = without(existingSubscription.clientIds, clientId)
Expand All @@ -116,11 +123,22 @@ export class Bridge implements MqttServerListener {
}
}

private getStreamPartition(topic: string): StreamPartID {
private getSubscribeStreamPart(topic: string): StreamPartID {
const { base, query } = parseQueryAndBase(topic)
const streamId = (this.streamIdDomain !== undefined) ? `${this.streamIdDomain}/${base}` : base
const partition = parseQueryParameter('partition', query, parsePositiveInteger)
return toStreamPartID(toStreamID(streamId), partition ?? DEFAULT_PARTITION)
return toStreamPartID(this.getStreamId(base), partition ?? DEFAULT_PARTITION)
}

private getPublishStreamPart(topic: string): { streamId: StreamID } & PublishPartitionDefinition {
const { base, query } = parseQueryAndBase(topic)
return {
streamId: this.getStreamId(base),
...parsePublishPartitionDefinition(query)
}
}

private getStreamId(topicBase: string): StreamID {
return toStreamID((this.streamIdDomain !== undefined) ? `${this.streamIdDomain}/${topicBase}` : topicBase)
}

private getSubscription(streamPartId: StreamPartID): StreamSubscription | undefined {
Expand Down
23 changes: 7 additions & 16 deletions packages/broker/src/plugins/websocket/PublishConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,18 @@ import { StreamrClient } from 'streamr-client'
import { Logger } from '@streamr/utils'
import { ParsedQs } from 'qs'
import { v4 as uuid } from 'uuid'
import { parsePositiveInteger, parseQueryParameter } from '../../helpers/parser'
import { Connection, PING_PAYLOAD } from './Connection'
import { PayloadFormat } from '../../helpers/PayloadFormat'
import { PublishPartitionDefinition, getPartitionKey, parsePublishPartitionDefinition } from '../../helpers/partitions'

export class PublishConnection implements Connection {

streamId: string
partition?: number
partitionKey?: string
partitionKeyField?: string
partitionDefinition: PublishPartitionDefinition

constructor(streamId: string, queryParams: ParsedQs) {
this.streamId = streamId
this.partition = parseQueryParameter<number>('partition', queryParams, parsePositiveInteger)
this.partitionKey = queryParams['partitionKey'] as string | undefined
this.partitionKeyField = queryParams['partitionKeyField'] as string | undefined
const partitionDefinitions = [this.partition, this.partitionKey, this.partitionKeyField].filter((d) => d !== undefined)
if (partitionDefinitions.length > 1) {
throw new Error('Invalid combination of "partition", "partitionKey" and "partitionKeyField"')
}
this.partitionDefinition = parsePublishPartitionDefinition(queryParams)
}

async init(
Expand All @@ -38,21 +30,20 @@ export class PublishConnection implements Connection {
if (payload !== PING_PAYLOAD) {
try {
const { content, metadata } = payloadFormat.createMessage(payload)
const partitionKey = this.partitionKey ?? (this.partitionKeyField ? (content[this.partitionKeyField] as string) : undefined)
await streamrClient.publish({
id: this.streamId,
partition: this.partition
partition: this.partitionDefinition.partition
}, content, {
timestamp: metadata.timestamp,
partitionKey,
partitionKey: getPartitionKey(content, this.partitionDefinition),
msgChainId
})
} catch (err: any) {
logger.warn('Unable to publish', {
err,
streamId: this.streamId,
partition: this.partition,
partitionKey: this.partitionKey,
partition: this.partitionDefinition.partition,
partitionKey: this.partitionDefinition.partitionKey,
msgChainId,
})
}
Expand Down
45 changes: 42 additions & 3 deletions packages/broker/test/unit/plugins/mqtt/Bridge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ describe('MQTT Bridge', () => {

it('onMessageReceived', async () => {
await bridge.onMessageReceived(topic, JSON.stringify(MOCK_CONTENT), MOCK_CLIENT_ID)
expect(streamrClient.publish).toBeCalledWith(`${MOCK_STREAM_ID}#0`, MOCK_CONTENT, { msgChainId: expect.any(String) })
expect(streamrClient.publish).toBeCalledWith(
{ id: MOCK_STREAM_ID, partition: undefined },
MOCK_CONTENT,
{ msgChainId: expect.any(String) }
)
})

it('onSubscribed', async () => {
Expand Down Expand Up @@ -98,12 +102,47 @@ describe('MQTT Bridge', () => {
bridge = new Bridge(streamrClient as any, undefined as any, new PlainPayloadFormat(), undefined)
})

it('publish', async () => {
it('publish with partition', async () => {
await bridge.onMessageReceived(`${MOCK_TOPIC}?partition=5`, JSON.stringify(MOCK_CONTENT), MOCK_CLIENT_ID)
expect(streamrClient.publish).toBeCalledWith(
`${MOCK_TOPIC}#5`,
{
id: MOCK_TOPIC,
partition: 5
},
MOCK_CONTENT,
{
msgChainId: MOCK_CLIENT_ID,
timestamp: undefined
}
)
})

it('publish with partition key', async () => {
await bridge.onMessageReceived(`${MOCK_TOPIC}?partitionKey=mock-key`, JSON.stringify(MOCK_CONTENT), MOCK_CLIENT_ID)
expect(streamrClient.publish).toBeCalledWith(
{
id: MOCK_TOPIC,
partition: undefined
},
MOCK_CONTENT,
{
partitionKey: 'mock-key',
msgChainId: MOCK_CLIENT_ID,
timestamp: undefined
}
)
})

it('publish with partition key field', async () => {
await bridge.onMessageReceived(`${MOCK_TOPIC}?partitionKeyField=foo`, JSON.stringify(MOCK_CONTENT), MOCK_CLIENT_ID)
expect(streamrClient.publish).toBeCalledWith(
{
id: MOCK_TOPIC,
partition: undefined
},
MOCK_CONTENT,
{
partitionKey: MOCK_CONTENT.foo,
msgChainId: MOCK_CLIENT_ID,
timestamp: undefined
}
Expand Down

0 comments on commit 5fe9bd5

Please sign in to comment.