Skip to content

Commit

Permalink
fix(client): Subscribe pipeline clears cache on decrypt error (stream…
Browse files Browse the repository at this point in the history
…r-dev#1458)

Small fix to subscribe pipeline error handling: if there was a decrypt error, the cache was not cleared. The `try-catch` never caught anything as we didn't await the promise which was returned from `decrypt()`.
  • Loading branch information
teogeb authored Jun 5, 2023
1 parent 5fe9bd5 commit cbd57f1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ found [here](packages/broker/CHANGELOG.md).

#### Fixed

- Clear permissions cache when message decryption fails (https://github.com/streamr-dev/network/pull/1458)

#### Security

### cli-tools
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/subscribe/subscribePipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export const createSubscribePipeline = (opts: SubscriptionPipelineOptions): Mess
await validateStreamMessage(msg, opts.streamRegistryCached)
if (StreamMessage.isAESEncrypted(msg)) {
try {
return decrypt(msg, opts.groupKeyManager, opts.destroySignal)
return await decrypt(msg, opts.groupKeyManager, opts.destroySignal)
} catch (err) {
// TODO log this in onError? if we want to log all errors?
logger.debug('Failed to decrypt', { messageId: msg.getMessageID(), err })
Expand Down
19 changes: 12 additions & 7 deletions packages/client/test/unit/subscribePipeline.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import 'reflect-metadata'

import { Wallet } from '@ethersproject/wallet'
import { EncryptionType, MessageID, StreamMessage, StreamPartID, StreamPartIDUtils, toStreamID } from '@streamr/protocol'
import { EncryptionType, MessageID, StreamMessage, StreamPartID, StreamPartIDUtils } from '@streamr/protocol'
import { fastWallet, randomEthereumAddress } from '@streamr/test-utils'
import { collect, toEthereumAddress } from '@streamr/utils'
import { mock } from 'jest-mock-extended'
Expand All @@ -15,6 +15,7 @@ import { LitProtocolFacade } from '../../src/encryption/LitProtocolFacade'
import { SubscriberKeyExchange } from '../../src/encryption/SubscriberKeyExchange'
import { StreamrClientEventEmitter } from '../../src/events'
import { createSignedMessage } from '../../src/publish/MessageFactory'
import { StreamRegistryCached } from '../../src/registry/StreamRegistryCached'
import { createSubscribePipeline } from "../../src/subscribe/subscribePipeline"
import { mockLoggerFactory } from '../test-utils/utils'
import { GroupKey } from './../../src/encryption/GroupKey'
Expand All @@ -27,6 +28,7 @@ const CONTENT = {
describe('subscribePipeline', () => {

let pipeline: MessageStream
let streamRegistryCached: Partial<StreamRegistryCached>
let streamPartId: StreamPartID
let publisher: Wallet

Expand Down Expand Up @@ -55,7 +57,7 @@ describe('subscribePipeline', () => {
streamPartId = StreamPartIDUtils.parse(`${randomEthereumAddress()}/path#0`)
publisher = fastWallet()
const stream = new Stream(
toStreamID(streamPartId),
StreamPartIDUtils.getStreamID(streamPartId),
{
partitions: 1,
},
Expand All @@ -81,6 +83,11 @@ describe('subscribePipeline', () => {
maxKeyRequestsPerSecond: 0
}
}
streamRegistryCached = {
getStream: async () => stream,
isStreamPublisher: async () => true,
clearStream: jest.fn()
}
pipeline = createSubscribePipeline({
streamPartId,
loggerFactory: mockLoggerFactory(),
Expand All @@ -94,11 +101,7 @@ describe('subscribePipeline', () => {
createPrivateKeyAuthentication(publisher.privateKey, {} as any),
config
),
streamRegistryCached: {
getStream: async () => stream,
isStreamPublisher: async () => true,
clearStream: () => {}
} as any,
streamRegistryCached: streamRegistryCached as any,
destroySignal,
config: config as any
})
Expand Down Expand Up @@ -159,5 +162,7 @@ describe('subscribePipeline', () => {
expect(error).toBeInstanceOf(DecryptError)
expect(error.message).toMatch(/timed out/)
expect(output).toEqual([])
expect(streamRegistryCached.clearStream).toBeCalledTimes(1)
expect(streamRegistryCached.clearStream).toBeCalledWith(StreamPartIDUtils.getStreamID(streamPartId))
})
})

0 comments on commit cbd57f1

Please sign in to comment.