Skip to content

Commit

Permalink
transform: Transform response data into an event emitter for piping (#11
Browse files Browse the repository at this point in the history
)
  • Loading branch information
codenothing authored Mar 2, 2023
1 parent ce2cf17 commit aec54de
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 1 deletion.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ const request = client.getServerStreamRequest("v1.Customers.FindCustomers", {

await client.makeBidiStreamRequest(
"v1.Customers.CreateCustomers",
request,
request.transform(async (customer) => {
return { ...customer, isCopied: true };
}),
async (row) => {
row; // Incoming response row chunk
}
Expand Down
74 changes: 74 additions & 0 deletions src/ProtoRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,80 @@ export class ProtoRequest<RequestType, ResponseType> extends EventEmitter {
);
}

/**
* Pipes response data from this request through the transformer and out into
* a new event emitter. This can be useful for piping one client request to
* another with an async transformer on each message
* @param {DataTransformer} transformer Async function for transforming data
* @returns {EventEmitter} A new event emitter instance
*/
public transform<OutputType>(
transformer: (data: ResponseType) => Promise<OutputType>
): EventEmitter {
const emitter = new EventEmitter();

// Local refs
let counter = 0;
let ended = false;
let finished = false;

// Process each data chunk through the transformer
const onData = (data: ResponseType) => {
counter++;
transformer(data)
.then((output) => {
if (finished) {
return;
}

emitter.emit("data", output);
if (--counter < 1 && ended) {
finished = true;
emitter.emit("end");
}
})
.catch((e) => {
if (!finished) {
finished = true;
emitter.emit("error", e);
}
});
};

// Stop processing on error
const onError = (e: Error) => {
remoteEvents();
finished = true;
emitter.emit("error", e);
};

// Keep track of when there are no more data events incoming,
// but don't signal end on the emitter until after all transforms
// have completed
const onEnd = () => {
remoteEvents();
ended = true;
if (counter < 1) {
finished = true;
emitter.emit("end");
}
};

// Normalized event removal
const remoteEvents = () => {
this.off("data", onData);
this.off("error", onError);
this.off("end", onEnd);
};

// Bind stream events
this.on("data", onData);
this.on("error", onError);
this.on("end", onEnd);

return emitter;
}

/**
* Aborts the request if it is still active
*/
Expand Down
89 changes: 89 additions & 0 deletions test/pipe.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,93 @@ describe("pipe", () => {
await request.waitForEnd();
expect(request.error?.message).toStrictEqual(`Pipe stream error`);
});

describe("transform", () => {
test("should handle piping a transformed request to another client request", async () => {
const readStreamRequest = getServerStreamRequest();
const { result } = await makeClientStreamRequest(
readStreamRequest.transform<Customer>(async (data) => {
return { id: data.id, name: data.name?.toUpperCase() };
})
);
expect(result).toEqual({
customers: [
{ id: "github", name: "GITHUB" },
{ id: "npm", name: "NPM" },
{ id: "circleci", name: "CIRCLECI" },
],
});
});

test("should handle a delay in transforming", async () => {
const readStreamRequest = getServerStreamRequest();
const { result } = await makeClientStreamRequest(
readStreamRequest.transform<Customer>(async (data) => {
await wait(5);
return { id: data.id, name: data.name?.toUpperCase() };
})
);
expect(result).toEqual({
customers: [
{ id: "github", name: "GITHUB" },
{ id: "npm", name: "NPM" },
{ id: "circleci", name: "CIRCLECI" },
],
});
});

test("should handle errors from the piped request", async () => {
THROW_FIND_ERROR = new MockServiceError(
status.INTERNAL,
`Mock Piped Request Error`
);
const readStreamRequest = getServerStreamRequest();
const { result, error } = await makeClientStreamRequest(
readStreamRequest.transform<Customer>(async (data) => {
await wait(5);
return { id: data.id, name: data.name?.toUpperCase() };
})
);
expect(error).toBeInstanceOf(Error);
expect(error?.message).toStrictEqual(
`13 INTERNAL: Mock Piped Request Error`
);
expect(result).toBeUndefined();
});

test("should handle transform errors", async () => {
const readStreamRequest = getServerStreamRequest();
const { result, error } = await makeClientStreamRequest(
readStreamRequest.transform(async () => {
await wait(5);
throw new Error(`Mock Transform Error`);
})
);
expect(error).toBeInstanceOf(Error);
expect(error?.message).toStrictEqual(`Mock Transform Error`);
expect(result).toBeUndefined();
});

test("should ignore all messages after a transform error", async () => {
let firstTransform = true;
const readStreamRequest = getServerStreamRequest();
const { result, error } = await makeClientStreamRequest(
readStreamRequest.transform(async (data) => {
if (firstTransform) {
firstTransform = false;
await wait(10);
throw new Error(`Mock Delayed Transform Error`);
} else {
await wait(20);
return { id: data.id, name: data.name?.toUpperCase() };
}
})
);
// Wait for the delayed transforms to complete
await wait(30);
expect(error).toBeInstanceOf(Error);
expect(error?.message).toStrictEqual(`Mock Delayed Transform Error`);
expect(result).toBeUndefined();
});
});
});

0 comments on commit aec54de

Please sign in to comment.