Skip to content

Commit

Permalink
Refactor StorageCommunicationEndpointProvider to return StorageCommun…
Browse files Browse the repository at this point in the history
…icationEndpoint per StoreInfo (#6614)

This patch moves the StoreInfo argument from the
getStorageEndpointProvider() call to the getStorageEndpoint() call, which means that
a single StorageEndpointProvider can be used for a set of StorageProxies.

Eventually this single StorageEndpointProvider will become a separate
StorageEndpointManager, rather than being a responsibility of Stores directly.
This will allow the same manager to be used in both direct communication mode
and in hosted (via PEC) mode.

A short-term consequence is that both StoreInfo and an ActiveStore are passed
into the StorageProxy constructor (the ActiveStore is currently still the
StorageEndpointProvider and StoreInfo is needed to get the endpoint). This
duplication will be resolved in a subsequent patch (the ActiveStore argument
will become a StorageEndpoint instead, vended by a StorageEndpointManager
which will have responsibility for creating ActiveStores on demand).
________________________________________________________________________________________________

next steps:
- DirectStorageEndpointManager should implement StorageCommunicationEndpointProvider, and ActiveStore StorageCommunicationEndpoint instead.

Closes #6614

PiperOrigin-RevId: 347933211
  • Loading branch information
mariakleiner authored and arcs-c3po committed Dec 17, 2020
1 parent d7f7b04 commit 638a50a
Show file tree
Hide file tree
Showing 17 changed files with 133 additions and 100 deletions.
2 changes: 1 addition & 1 deletion src/planning/plan/plan-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class PlanProducer {
this.searchStore = searchStore;
this.inspector = inspector;
if (this.searchStore) {
this.handle = handleForActiveStore(this.searchStore, this.arc);
this.handle = handleForActiveStore(this.searchStore.storeInfo, this.searchStore, this.arc);
this.searchStoreCallbackId = this.searchStore.on(() => this.onSearchChanged());
}
this.debug = debug;
Expand Down
2 changes: 1 addition & 1 deletion src/planning/plan/planificator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ export class Planificator {
}

async _storeSearch(): Promise<void> {
const handle = handleForActiveStore(this.searchStore, this.arc);
const handle = handleForActiveStore(this.searchStore.storeInfo, this.searchStore, this.arc);
const handleValue = await handle.fetch();
const values = handleValue ? JSON.parse(handleValue.current) : [];

Expand Down
2 changes: 1 addition & 1 deletion src/planning/plan/planning-result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export class PlanningResult {
assert(envOptions.storageManager, `storageManager cannot be null`);
this.store = store;
if (this.store) {
this.handle = handleForActiveStore(store, {...envOptions.context, storageManager: envOptions.storageManager});
this.handle = handleForActiveStore(store.storeInfo, store, {...envOptions.context, storageManager: envOptions.storageManager});
this.storeCallbackId = this.store.on(() => this.load());
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/planning/plan/tests/plan-producer-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ describe('plan producer - search', () => {
}

async setNextSearch(search: string) {
const handle = handleForActiveStore(this.searchStore, this.arc);
const handle = handleForActiveStore(this.searchStore.storeInfo, this.searchStore, this.arc);
await handle.setFromData({current: JSON.stringify([{arc: this.arc.id.idTreeAsString(), search}])});
return this.onSearchChanged();
}
Expand Down
15 changes: 8 additions & 7 deletions src/runtime/particle-execution-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,19 @@ export class ParticleExecutionContext {
this.apiPort = new class extends PECInnerPort implements StorageFrontend {

onDefineHandle(identifier: string, storeInfo: StoreInfo<Type>, name: string, ttl: Ttl) {
return new StorageProxy(identifier, pec.getStorageEndpointProvider(storeInfo), ttl);
return new StorageProxy(identifier, storeInfo, pec.getStorageEndpointProvider(), ttl);
}

onDefineHandleFactory(identifier: string, storeInfo: StoreInfo<Type>, name: string, ttl: Ttl) {
return new StorageProxyMuxer(pec.getStorageEndpointProvider(storeInfo));
return new StorageProxyMuxer(storeInfo, pec.getStorageEndpointProvider());
}

onGetDirectStoreMuxerCallback(
storeInfo: StoreInfo<Type>,
callback: (storageProxyMuxer: StorageProxyMuxer<CRDTTypeRecord>, key: string) => void,
name: string,
id: string) {
const storageProxyMuxer = new StorageProxyMuxer(pec.getStorageEndpointProvider(storeInfo));
const storageProxyMuxer = new StorageProxyMuxer(storeInfo, pec.getStorageEndpointProvider());
return [storageProxyMuxer, () => callback(storageProxyMuxer, storeInfo.storageKey.toString())];
}

Expand All @@ -103,7 +103,7 @@ export class ParticleExecutionContext {
name: string,
id: string) {
// TODO(shanestephens): plumb storageKey through to internally created handles too.
const proxy = new StorageProxy(id, pec.getStorageEndpointProvider(storeInfo));
const proxy = new StorageProxy(id, storeInfo, pec.getStorageEndpointProvider());
return [proxy, () => callback(proxy)];
}

Expand Down Expand Up @@ -175,11 +175,12 @@ export class ParticleExecutionContext {
return this.idGenerator.newChildId(this.pecId).toString();
}

getStorageEndpointProvider(storeInfo: StoreInfo<Type>): StorageCommunicationEndpointProvider<CRDTTypeRecord> {
getStorageEndpointProvider(): StorageCommunicationEndpointProvider<CRDTTypeRecord> {
const pec = this;
return {
get storeInfo(): StoreInfo<CRDTTypeRecordToType<CRDTTypeRecord>> { return storeInfo; },
getStorageEndpoint(storageProxy: StorageProxy<CRDTTypeRecord> | StorageProxyMuxer<CRDTTypeRecord>): StorageCommunicationEndpoint<CRDTTypeRecord> {
getStorageEndpoint(
storeInfo: StoreInfo<CRDTTypeRecordToType<CRDTTypeRecord>>,
storageProxy: StorageProxy<CRDTTypeRecord> | StorageProxyMuxer<CRDTTypeRecord>): StorageCommunicationEndpoint<CRDTTypeRecord> {
return createStorageEndpoint(storageProxy, pec, pec.apiPort);
}
};
Expand Down
45 changes: 3 additions & 42 deletions src/runtime/storage/active-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import {PropagatedException} from '../arc-exceptions.js';
import {CRDTTypeRecord} from '../../crdt/lib-crdt.js';
import {Exists} from './drivers/driver.js';
import {StorageKey} from './storage-key.js';
import {noAwait} from '../../utils/lib-utils.js';
import {ChannelConstructor} from '../channel-constructor.js';
import {CRDTTypeRecordToType} from './storage.js';
import {StoreInfo} from './store-info.js';
import {StoreInterface, StorageCommunicationEndpointProvider, StorageMode, StoreConstructorOptions, ProxyMessageType, ProxyCallback, ProxyMessage} from './store-interface.js';
import {DirectStorageEndpoint} from './direct-storage-endpoint.js';

// A representation of an active store. Subclasses of this class provide specific
// behaviour as controlled by the provided StorageMode.
Expand Down Expand Up @@ -64,47 +63,9 @@ export abstract class ActiveStore<T extends CRDTTypeRecord>
abstract async onProxyMessage(message: ProxyMessage<T>): Promise<void>;
abstract reportExceptionInHost(exception: PropagatedException): void;

getStorageEndpoint() {
const store = this;
let id: number;
return {
get storeInfo() { return store.storeInfo; },
async onProxyMessage(message: ProxyMessage<T>): Promise<void> {
message.id = id!;
noAwait(store.onProxyMessage(message));
},

setCallback(callback: ProxyCallback<T>) {
id = store.on(callback);
},
reportExceptionInHost(exception: PropagatedException): void {
store.reportExceptionInHost(exception);
},
getChannelConstructor(): ChannelConstructor {
// TODO(shans): implement so that we can use references outside of the PEC.
return {
generateID() {
return null;
},
idGenerator: null,
getStorageProxyMuxer() {
throw new Error('References not yet supported outside of the PEC');
},
reportExceptionInHost(exception: PropagatedException): void {
store.reportExceptionInHost(exception);
}
};
},
async idle(): Promise<void> { return store.idle(); },
async close(): Promise<void> {
if (id) {
return store.off(id);
}
}
};
getStorageEndpoint(storeInfo: StoreInfo<CRDTTypeRecordToType<T>>) {
return new DirectStorageEndpoint<T>(this);
}
// getStorageEndpoint() {
// }
}

export type StoreConstructor = {
Expand Down
60 changes: 60 additions & 0 deletions src/runtime/storage/direct-storage-endpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@

/**
* @license
* Copyright (c) 2020 Google Inc. All rights reserved.
* This code may only be used under the BSD style license found at
* http://polymer.github.io/LICENSE.txt
* Code distributed by Google as part of this project is also
* subject to an additional IP rights grant found at
* http://polymer.github.io/PATENTS.txt
*/

import {StorageCommunicationEndpoint, ProxyMessage, ProxyCallback, StorageCommunicationEndpointProvider} from './store-interface.js';
import {CRDTTypeRecord} from '../../crdt/lib-crdt.js';
import {ActiveStore} from './active-store.js';
import {ChannelConstructor} from '../channel-constructor.js';
import {PropagatedException} from '../arc-exceptions.js';
import {noAwait} from '../../utils/lib-utils.js';

export class DirectStorageEndpoint<T extends CRDTTypeRecord> implements StorageCommunicationEndpoint<T> {
private id = 0;

constructor(private readonly store: ActiveStore<T>) {}

get storeInfo() { return this.store.storeInfo; }

async onProxyMessage(message: ProxyMessage<T>): Promise<void> {
message.id = this.id!;
noAwait(this.store.onProxyMessage(message));
}

setCallback(callback: ProxyCallback<T>) {
this.id = this.store.on(callback);
}
reportExceptionInHost(exception: PropagatedException): void {
this.store.reportExceptionInHost(exception);
}

getChannelConstructor(): ChannelConstructor {
const store = this.store;
// TODO(shans): implement so that we can use references outside of the PEC.
return {
generateID() {
return null;
},
idGenerator: null,
getStorageProxyMuxer() {
throw new Error('References not yet supported outside of the PEC');
},
reportExceptionInHost(exception: PropagatedException): void {
store.reportExceptionInHost(exception);
}
};
}
async idle(): Promise<void> { return this.store.idle(); }
async close(): Promise<void> {
if (this.id) {
return this.store.off(this.id);
}
}
}
13 changes: 5 additions & 8 deletions src/runtime/storage/storage-proxy-muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ export class StorageProxyMuxer<T extends CRDTTypeRecord> {
private readonly storageEndpoint: StorageCommunicationEndpoint<T>;
private readonly storageKey: string;
private readonly type: Type;
public readonly storeInfo: StoreInfo<CRDTTypeRecordToType<T>>;

constructor(storeProvider: StorageCommunicationEndpointProvider<T>) {
this.storageEndpoint = storeProvider.getStorageEndpoint(this);
this.storeInfo = storeProvider.storeInfo;
constructor(public readonly storeInfo: StoreInfo<CRDTTypeRecordToType<T>>,
storeProvider: StorageCommunicationEndpointProvider<T>) {
this.storageEndpoint = storeProvider.getStorageEndpoint(storeInfo, this);
this.storageKey = this.storeInfo.storageKey.toString();
this.type = this.storeInfo.type;
}
Expand All @@ -38,16 +37,14 @@ export class StorageProxyMuxer<T extends CRDTTypeRecord> {
this.storageEndpoint.setCallback(this.onMessage.bind(this));
if (!this.storageProxies.hasL(muxId)) {
const storageCommunicationEndpointProvider = this.createStorageCommunicationEndpointProvider(muxId, this.storageEndpoint, this);
this.storageProxies.set(muxId, new StorageProxy(muxId, storageCommunicationEndpointProvider));
this.storageProxies.set(muxId, new StorageProxy(muxId, this.storeInfo, storageCommunicationEndpointProvider));
}
return this.storageProxies.getL(muxId);
}

createStorageCommunicationEndpointProvider(muxId: string, storageEndpoint: StorageCommunicationEndpoint<T>, storageProxyMuxer: StorageProxyMuxer<T>): StorageCommunicationEndpointProvider<T> {
const storeInfo = new StoreInfo({id: this.storeInfo.id, type: this.type.getContainedType(), storageKey: this.storeInfo.storageKey}) as StoreInfo<CRDTTypeRecordToType<T>>;
return {
get storeInfo(): StoreInfo<CRDTTypeRecordToType<T>> { return storeInfo; },
getStorageEndpoint(): StorageCommunicationEndpoint<T> {
getStorageEndpoint(storeInfo: StoreInfo<CRDTTypeRecordToType<T>>): StorageCommunicationEndpoint<T> {
return {
get storeInfo(): StoreInfo<CRDTTypeRecordToType<T>> { return this.storeInfo; },
async onProxyMessage(message: ProxyMessage<T>): Promise<void> {
Expand Down
18 changes: 13 additions & 5 deletions src/runtime/storage/storage-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,20 @@ export class StorageProxy<T extends CRDTTypeRecord> {
readonly storageKey: string;
readonly ttl: Ttl;

// Note: as a next step StorageProxy ctor will be accepting `StorageCommunicationEndpoint`
// as parameter, instead of currently `StorageCommunicationEndpointProvider` and
// `StorInfo`. `StorageEndpointManager` will implement `StorageCommunicationEndpointProvider`
// and creating `ActiveStore`, which will be implementing `StorageCommunicationEndpoint`.
constructor(
apiChannelId: string,
storeInfo: StoreInfo<CRDTTypeRecordToType<T>>,
storeProvider: StorageCommunicationEndpointProvider<T>,
ttl = Ttl.infinite()) {
this.apiChannelId = apiChannelId;
this.store = storeProvider.getStorageEndpoint(this);
this.type = storeProvider.storeInfo.type;
this.store = storeProvider.getStorageEndpoint(storeInfo, this);
this.type = storeInfo.type;
this.crdt = new (this.type.crdtInstanceConstructor<T>())();
this.storageKey = storeProvider.storeInfo.storageKey ? storeProvider.storeInfo.storageKey.toString() : null;
this.storageKey = storeInfo.storageKey ? storeInfo.storageKey.toString() : null;
this.ttl = ttl;
this.scheduler = new StorageProxyScheduler<T>();
}
Expand Down Expand Up @@ -305,8 +310,11 @@ export class StorageProxy<T extends CRDTTypeRecord> {

export class NoOpStorageProxy<T extends CRDTTypeRecord> extends StorageProxy<T> {
constructor() {
// tslint:disable-next-line: no-any
super(null, {getStorageEndpoint() {}, storeInfo: new StoreInfo({id: null, type: EntityType.make([], {}) as any as CRDTTypeRecordToType<T>})} as ActiveStore<T>);
super(null,
// tslint:disable-next-line: no-any
new StoreInfo({id: null, type: EntityType.make([], {}) as any as CRDTTypeRecordToType<T>}),
{getStorageEndpoint(storeInfo: StoreInfo<CRDTTypeRecordToType<T>>) {}} as ActiveStore<T>
);
}
async idle(): Promise<void> {
return new Promise(resolve => {});
Expand Down
15 changes: 11 additions & 4 deletions src/runtime/storage/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ export async function newHandle<T extends Type>(
}

export function handleForActiveStore<T extends CRDTTypeRecord>(
storeInfo: StoreInfo<CRDTTypeRecordToType<T>>,
store: StorageCommunicationEndpointProvider<T>,
arc: ArcLike,
options: HandleOptions = {}
): ToHandle<T> {
const type = options.type || store.storeInfo.type;
const storageKey = store.storeInfo.storageKey.toString();
const type = options.type || storeInfo.type;
const storageKey = storeInfo.storageKey.toString();

const idGenerator = arc.idGenerator;
const particle = options.particle || null;
Expand All @@ -138,10 +139,11 @@ export function handleForActiveStore<T extends CRDTTypeRecord>(
const generateID = arc.generateID ? () => arc.generateID().toString() : () => '';
if (store instanceof DirectStoreMuxer) {
const proxyMuxer = new StorageProxyMuxer<CRDTMuxEntity>(
storeInfo as StoreInfo<MuxEntityType>,
store as StorageCommunicationEndpointProvider<CRDTMuxEntity>);
return new EntityHandleFactory(proxyMuxer) as ToHandle<T>;
} else {
const proxy = new StorageProxy<T>(store.storeInfo.id, store, options.ttl);
const proxy = new StorageProxy<T>(storeInfo.id, storeInfo, store, options.ttl);
if (type instanceof SingletonType) {
// tslint:disable-next-line: no-any
return new SingletonHandle(generateID(), proxy as any, idGenerator, particle, canRead, canWrite, name) as ToHandle<T>;
Expand All @@ -153,6 +155,11 @@ export function handleForActiveStore<T extends CRDTTypeRecord>(
}

export async function handleForStoreInfo<T extends Type>(storeInfo: StoreInfo<T>, arc: ArcLike, options?: HandleOptions): Promise<ToHandle<TypeToCRDTTypeRecord<T>>> {
return handleForActiveStore(await arc.storageManager.getActiveStore(storeInfo), arc, options) as ToHandle<TypeToCRDTTypeRecord<T>>;
return handleForActiveStore(
storeInfo as unknown as StoreInfo<CRDTTypeRecordToType<TypeToCRDTTypeRecord<T>>>,
await arc.storageManager.getActiveStore(storeInfo),
arc,
options
) as ToHandle<TypeToCRDTTypeRecord<T>>;
}

3 changes: 1 addition & 2 deletions src/runtime/storage/store-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,5 @@ export interface StorageCommunicationEndpoint<T extends CRDTTypeRecord> {
}

export interface StorageCommunicationEndpointProvider<T extends CRDTTypeRecord> {
storeInfo: StoreInfo<CRDTTypeRecordToType<T>>;
getStorageEndpoint(storageProxy?: StorageProxy<T> | StorageProxyMuxer<T>): StorageCommunicationEndpoint<T>;
getStorageEndpoint(storeInfo: StoreInfo<CRDTTypeRecordToType<T>>, storageProxy?: StorageProxy<T> | StorageProxyMuxer<T>): StorageCommunicationEndpoint<T>;
}
5 changes: 4 additions & 1 deletion src/runtime/storage/tests/entity-handle-factory-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ describe('entity handle factory', () => {
fooEntity2CRDT.applyOperation({type: EntityOpTypes.Set, field: 'value', value: {id: 'Text', value: 'OtherText'}, actor: 'me', versionMap: {'me': 1}});

const mockDirectStoreMuxer = new MockDirectStoreMuxer<CRDTMuxEntity>(new MuxType(fooEntityType));
const storageProxyMuxer = new StorageProxyMuxer(mockDirectStoreMuxer as DirectStoreMuxer<Identified, Identified, CRDTMuxEntity>);
const storageProxyMuxer = new StorageProxyMuxer(
mockDirectStoreMuxer.storeInfo,
mockDirectStoreMuxer as DirectStoreMuxer<Identified, Identified, CRDTMuxEntity>
);
const entityHandleProducer = new EntityHandleFactory(storageProxyMuxer);

const entityHandle1 = entityHandleProducer.getHandle(fooMuxId1);
Expand Down
18 changes: 7 additions & 11 deletions src/runtime/storage/tests/handle-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {MockParticle, MockStore} from '../testing/test-storage.js';
import {Manifest} from '../../manifest.js';
import {EntityClass, Entity, SerializedEntity} from '../../entity.js';
import {SYMBOL_INTERNALS} from '../../symbols.js';
import {CRDTEntityCollection, ActiveCollectionEntityStore, CollectionEntityType} from '../storage.js';
import {CRDTEntityCollection, ActiveCollectionEntityStore, CollectionEntityType, CRDTEntitySingleton, CRDTMuxEntity} from '../storage.js';
import {Reference} from '../../reference.js';
import {VersionMap, CollectionOperation, CollectionOpTypes, CRDTCollectionTypeRecord,
CRDTCollection, CRDTSingletonTypeRecord, SingletonOperation, SingletonOpTypes, CRDTSingleton,
Expand All @@ -31,12 +31,10 @@ import {StoreInfo} from '../store-info.js';
async function getCollectionHandle(primitiveType: EntityType, particle?: MockParticle, canRead=true, canWrite=true):
Promise<CollectionHandle<Entity>> {
const fakeParticle: Particle = (particle || new MockParticle()) as unknown as Particle;
const store = new MockStore<CRDTEntityCollection>(new CollectionType(primitiveType)) as unknown as ActiveCollectionEntityStore;
const mockStore = new MockStore<CRDTEntityCollection>(new CollectionType(primitiveType));
const handle = new CollectionHandle(
'me',
new StorageProxy(
'id',
new MockStore<CRDTCollectionTypeRecord<SerializedEntity>>(new CollectionType(primitiveType))),
new StorageProxy('id', mockStore.storeInfo, mockStore),
IdGenerator.newSession(),
fakeParticle,
canRead,
Expand All @@ -53,11 +51,10 @@ async function getCollectionHandle(primitiveType: EntityType, particle?: MockPar
async function getSingletonHandle(primitiveType: EntityType, particle?: MockParticle, canRead=true, canWrite=true):
Promise<SingletonHandle<Entity>> {
const fakeParticle: Particle = (particle || new MockParticle()) as unknown as Particle;
const mockStore = new MockStore<CRDTEntitySingleton>(new SingletonType(primitiveType));
const handle = new SingletonHandle(
'me',
new StorageProxy(
'id',
new MockStore<CRDTSingletonTypeRecord<SerializedEntity>>(new SingletonType(primitiveType))),
new StorageProxy('id', mockStore.storeInfo, mockStore),
IdGenerator.newSession(),
fakeParticle,
canRead,
Expand All @@ -74,9 +71,8 @@ async function getSingletonHandle(primitiveType: EntityType, particle?: MockPart
async function getEntityHandle(schema: Schema, muxId: string, particle?: MockParticle, canRead=true, canWrite=true):
Promise<EntityHandle<Entity>> {
const fakeParticle: Particle = (particle || new MockParticle()) as unknown as Particle;
const storageProxy = new StorageProxy(
'id',
new MockStore<CRDTEntityTypeRecord<Identified, Identified>>(new MuxType<EntityType>(new EntityType(schema))));
const mockStore = new MockStore<CRDTMuxEntity>(new MuxType<EntityType>(new EntityType(schema)));
const storageProxy = new StorageProxy('id', mockStore.storeInfo, mockStore);
const handle = new EntityHandle<Entity>(
'me',
storageProxy,
Expand Down
Loading

0 comments on commit 638a50a

Please sign in to comment.