Skip to content

Concurrency primitives for TypeScript and JavaScript.

License

Notifications You must be signed in to change notification settings

joeycumines/ts-chan

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ts-chan

NPM Package GitHub Repo Code Style: Google

Concurrency primitives for TypeScript and JavaScript.

Introduction

Concurrency in JavaScript, frankly, sucks.

This module is an effort to provide concurrency primitives for TypeScript/JavaScript that capture as much of the semantics of Go's channels as possible, while remaining idiomatic to the language.

I'll be iterating on this for a few weeks, in my spare time, with the goal of a production-ready module, which can be used any JS environment, including browsers.

Architecture

Protocol

To facilitate arbitrary implementations, this module defines a protocol for implementation of channels, modelled as Sender and Receiver. This protocol is styled after JavaScript's iteration protocols, though it differs in that the outer types, Sendable and Receivable (analogues to Iterable), are not intended to support statefulness independently of the channel (no return analogue).

This documentation is a work in progress, so, for now, it may be easiest to peruse src/protocol.ts.

Chan class

The Chan class is a reference implementation of the channel protocol. It has full support for the protocol, Go-like channel close semantics, and supports buffered channel semantics.

Unlike Go's channel, all sends and receives are processed in FIFO order, allowing it to function as a queue, if desired. Also provided are a number of convenience methods and properties, that are not part of the core protocol, but are useful for common use cases.

See the API documentation for more details.

Select class

The absence of an analogue to Go's select statement would limit the usefulness of channels, as the select statement is Go's key to implementing "reactive" software. The Select class provided by this module is intended to fill that gap, and is modelled after Go's select statement, particularly regarding the semantics of receiving and sending. This class utilizes the "channel protocol" (as defined by this module). AbortSignal is fully supported, and (can) function equivalently to including a case <-ctx.Done():, in Go. Promises are also supported, though they have no analogue, in Go.

Comparison to Go's select statement

Similarities
  1. Random Selection: Just as Go's select statement picks one communicative operation using a uniform pseudo-random selection (if more than one is immediately available), the Select class does so too.
  2. Blocking Behavior: In Go, if no communication can proceed and there's no default case, the select statement blocks. Similarly, the Select class's wait method will also block until a case is ready.
  3. Default Case Equivalence: The poll method in the Select class serves a similar purpose as the default case in Go's select statement. If no case is ready, poll will return undefined, offering a non-blocking alternative.
Differences
  1. Return Value from wait and poll: The Select class's wait method returns a promise that resolves with the index of the next ready case. The poll method, on the other hand, returns the index directly or undefined. In contrast, Go's select statement does not return values in this manner. This is a mechanism used to provide type support.
  2. Operation to "receive" value: Once a receive case is ready, in the Select class, the result must be explicitly retrieved using the recv method, which must be provided with the case which is ready. This contrasts with Go, where the received value is directly assigned in the case clause. Again, this is a part of the mechanism used to provide type support.
  3. Limited default value support: Nil channels have not analogue in TS/JS. Additionally, while receiving a "default value" (on receive from a closed channel) is a supported part of the channel protocol, it's not required, and has no (type-based) mechanism to describe whether the channel supports it, or not.
Different, for now
  1. Case Evaluation Order: It's quite possible that some or all of the case evaluation semantics will be adopted, as an optional feature. The current implementation has the SenderCallback acting as both the mechanism to evaluate expressions (for each value to send), and the mechanism to handle the outcome of the send operation (sends may fail with an error, i.e. if the channel is closed). Go's behavior can be simulated (using the current protocol and implementations), but it requires additional bits.

API

Table of Contents

Chan

Provides a communication mechanism between two or more concurrent operations.

In addition to various utility methods, it implements:

Parameters

  • capacity (optional, default 0)
  • newDefaultValue function (): T?

capacity

Returns the maximum number of items the channel can buffer.

Type: number

Returns number

length

Returns the number of items in the channel buffer.

Type: number

Returns number

concurrency

Returns an integer representing the number of blocking operations. Positive values indicate senders, while negative values indicate receivers.

Type: number

Returns number

trySend

Performs a synchronous send operation on the channel, returning true if it succeeds, or false if there are no waiting receivers, and the channel is full.

Will throw SendOnClosedChannelError if the channel is closed.

Parameters
  • value T

Returns boolean

send

Sends a value to the channel, returning a promise that resolves when it has been received, and rejects on error, or on abort signal.

Parameters
  • value T
  • abort AbortSignal?

Returns Promise<void>

tryRecv

Like trySend, this performs a synchronous recv operation on the channel, returning undefined if no value is available, or an iterator result, which models the received value, and whether the channel is open.

Returns (IteratorResult<T, (T | undefined)> | undefined)

recv

Receives a value from the channel, returning a promise that resolves with an iterator (the value OR indicator that the channel is closed, possibly with a default value), or rejects on error, or on abort signal.

Parameters
  • abort AbortSignal?

Returns Promise<IteratorResult<T, (T | undefined)>>

close

Closes the channel, preventing further sending of values.

See also Sender and Sender.close, which this implements.

  • Once a channel is closed, no more values can be sent to it.
  • If the channel is buffered and there are still values in the buffer when the channel is closed, receivers will continue to receive those values until the buffer is empty.
  • Attempting to send to a closed channel will result in an error and unblock any senders.
  • If the channel is already closed, calling close again will throw a CloseOfClosedChannelError.
  • This method should be used to signal the end of data transmission or prevent potential deadlocks.
  • Throws CloseOfClosedChannelError When attempting to close a channel that is already closed.
  • Throws Error When an error occurs while closing the channel, and no other specific error is thrown.

Returns void

ChanIterator

Iterates on all available values. May alternate between returning done and not done, unless ChanIterator.return or ChanIterator.throw are called.

Only the type is exported - may be initialized only performing an iteration on a Chan instance, or by calling chan[Symbol.iterator]().

Parameters

iterator

Returns this.

Returns Iterator<T>

next

Next iteration.

Returns IteratorResult<T>

return

Ends the iterator, which is an idempotent operation.

Returns IteratorResult<T>

throw

Ends the iterator with an error, which is an idempotent operation.

Parameters
  • e any?

Returns IteratorResult<T>

ChanAsyncIterator

Iterates by receiving values from the channel, until it is closed, or the ChanAsyncIterator.return or ChanAsyncIterator.throw methods are called.

Only the type is exported - may be initialized only performing an async iteration on a Chan instance, or by calling chan[Symbol.asyncIterator]().

Parameters

asyncIterator

Returns this.

Returns AsyncIterator<T>

next

Next iteration.

Returns Promise<IteratorResult<T>>

return

Ends the iterator, which is an idempotent operation.

Returns Promise<IteratorResult<T>>

throw

Ends the iterator with an error, which is an idempotent operation.

Parameters
  • e any?

Returns Promise<IteratorResult<T>>

Receiver

Receiver allows callers to receive values. It uses a one-off callback that models what is going to receive the value.

Unlike Iterator, it is not intended to support statefulness - a Receivable should return equivalent (but not necessarily identical) Receiver instances on each call to getReceiver.

Type: {addReceiver: function (callback: ReceiverCallback<T>): boolean, removeReceiver: function (callback: ReceiverCallback<T>): void}

Properties

addReceiver

Add a receiver callback to a list of receivers, or call it immediately if there is an available sender. Returns true if the receiver was called added to the receiver list. Returns false if the receiver was called immediately.

Type: function (callback: ReceiverCallback<T>): boolean

removeReceiver

Immediately removes the receiver from the receiver list, if it is there.

Type: function (callback: ReceiverCallback<T>): void

ReceiverCallback

ReceiverCallback is a callback that receives a value from a sender and true, or a default value (or undefined if unsupported), and false, if the channel is closed.

Type: function (...([T, true] | [(T | undefined), false])): void

Receivable

Receivable is a value that can be converted to a Receiver.

Type: {getReceiver: function (): Receiver<T>}

Properties

getReceiver

See Receivable.

Sender

Sender allows callers to send values. It uses a one-off callback that models what is going to send the value.

Unlike Iterator, it is not intended to support statefulness - a Sendable should return equivalent (but not necessarily identical) Sender instances on each call to getSender.

See also SendOnClosedChannelError, which SHOULD be raised on addSender (if closed on add) or passed into send callbacks (otherwise), when attempting to send on a closed channel.

Type: {addSender: function (callback: SenderCallback<T>): boolean, removeSender: function (callback: SenderCallback<T>): void, close: function (): void?}

Properties

addSender

Add a sender callback to a list of senders, or call it immediately if there is an available receiver. Returns true if the sender was added to the sender list. Returns false if the sender was called immediately. If the channel is closed, SHOULD throw SendOnClosedChannelError. If the channel is closed while the sender is waiting to be called, the sender SHOULD be called with SendOnClosedChannelError.

Type: function (callback: SenderCallback<T>): boolean

removeSender

Immediately removes the sender from the sender list, if it is there.

Type: function (callback: SenderCallback<T>): void

close

Closes the channel, adhering to the following semantics similar to Go's channels:

  • Once a channel is closed, no more values can be sent to it.
  • If a channel is buffered, and there are still values in the buffer when the channel is closed, the receivers will continue to receive those values until the buffer is empty.
  • It's the responsibility of the sender to close the channel, signaling to the receiver that no more data will be sent.
  • Attempting to send to a closed channel MUST result in an error, and MUST un-block any such senders as part of said close.
  • The error thrown when attempting to send on a closed channel SHOULD be SendOnClosedChannelError, but MAY be another error.
  • Unless explicitly documented as idempotent, close SHOULD throw CloseOfClosedChannelError on subsequent calls, but MAY throw other errors.
  • Channels should be closed to prevent potential deadlocks or to signal the end of data transmission. This ensures that receivers waiting on the channel don't do so indefinitely.

Note: This method is optional. Some Sendable implementations may specify their own rules and semantics for closing channels. Always refer to the specific implementation's documentation to ensure correct usage and to prevent potential memory leaks or unexpected behaviors.

See also SendOnClosedChannelError and CloseOfClosedChannelError.

Type: function (): void

SenderCallback

SenderCallback is called as a value is received, or when an error or some other event occurs, which prevents the value from being received. It accepts two parameters, an error (if any), and the boolean ok, indicating if the value has been (will be, after return) received. It MUST return the value (or throw) if ok is true, and SHOULD throw err if ok is false.

The ok parameter being true guarantees that a value (once returned) has been received, though does not guarantee that anything will be done with it.

If the ok parameter is false, the first parameter will contain any error, and no value (regardless of what is returned) will be received.

Note: The sender callback is not called on removeSender.

WARNING: If the same value (===) as err (when ok is false) is thrown, that thrown error will not be bubbled - a mechanism used to avoid breaking the typing of the return value.

Type: function (...([undefined, true] | [any, false])): T

Sendable

Sendable is a value that can be converted to a Sender.

Type: {getSender: function (): Sender<T>}

Properties

  • getSender function (): Sender<T>

getSender

See Sendable.

SendOnClosedChannelError

Extends Error

Provided as a convenience, that SHOULD be used by Sender implementations, to indicate that a channel is closed. Should be raised as a result of send attempts on a closed channel, where the send operation is not allowed to proceed.

Parameters

  • args ...ConstructorParameters<any>

CloseOfClosedChannelError

Extends Error

Provided as a convenience, that SHOULD be used by Sender implementations, in the event that a channel close is attempted more than once.

Parameters

  • args ...ConstructorParameters<any>

SelectCase

SelectCase models the state of a single case in a Select.

WARNING: The selectState symbol is deliberately not exported, as the value of SelectCase[selectState] is not part of the API contract, and is simply a mechanism to support typing.

Type: (SelectCaseSender<T> | SelectCaseReceiver<T> | SelectCasePromise<T>)

recv

Prepares a SelectCaseReceiver case, to be used in a Select.

WARNING: Cases may only be used in a single select instance, though select instances are intended to be reused, e.g. when implementing control loops.

Parameters

Returns SelectCaseReceiver<T>

send

Prepares a SelectCaseSender case, to be used in a Select.

WARNING: Cases may only be used in a single select instance, though select instances are intended to be reused, e.g. when implementing control loops.

Parameters

Returns SelectCaseSender<T>

Select

Select implements the functionality of Go's select statement, with support for support cases comprised of Sender, Receiver, or values (resolved as promises), which are treated as a single-value never-closed channel.

Parameters

cases

Retrieves the cases associated with this select instance.

Each case corresponds to an input case (including order). After selecting a case, via poll or wait, received values may be retrieved by calling recv with the corresponding case.

Type: SelectCases<T>

Examples

Accessing a (typed) received value:

import {recv, Chan, Select} from 'ts-chan';

const ch1 = new Chan<number>();
const ch2 = new Chan<string>();

void sendsToCh1ThenEventuallyClosesIt();
void sendsToCh2();

const select = new Select([recv(ch1), recv(ch2)]);
for (let running = true; running;) {
  const i = await select.wait();
  switch (i) {
  case 0: {
    const v = select.recv(select.cases[i]);
    if (v.done) {
      running = false;
      break;
    }
    console.log(`rounded value: ${Math.round(v.value)}`);
    break;
  }
  case 1: {
    const v = select.recv(select.cases[i]);
    if (v.done) {
      throw new Error('ch2 unexpectedly closed');
    }
    console.log(`uppercase string value: ${v.value.toUpperCase()}`);
    break;
  }
  default:
    throw new Error('unreachable');
  }
}

Returns SelectCases<T>

poll

Poll returns the next case that is ready, or undefined if none are ready. It must not be called concurrently with wait or recv.

This is effectively a non-blocking version of wait, and fills the same role as the default select case, in Go's select statement.

Returns (number | undefined)

wait

Wait returns a promise that will resolve with the index of the next case that is ready, or reject with the first error.

Parameters
  • abort AbortSignal?

Returns Promise<number>

recv

Consume the result of a ready case.

Parameters

Returns IteratorResult<T, (T | undefined)>

About

Concurrency primitives for TypeScript and JavaScript.

Resources

License

Stars

Watchers

Forks

Packages

No packages published