Skip to content

Commit

Permalink
[server] Broadcast headless updates to subscribers WEB-598, WEB-600 (g…
Browse files Browse the repository at this point in the history
…itpod-io#18215)

* [server] Publish events during workspace starter

* retest

* fix

* fix

* retest

* retest

* retest

* retest

* retest

* retest

* retest

* [server] Broadcast prebuild updates from redis

* [server] Broadcast headless updates to subscribers

* Fix

* Fix

* Fix

* fix

* fix

* retest
  • Loading branch information
easyCZ authored Jul 12, 2023
1 parent f59c860 commit 3400259
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 10 deletions.
7 changes: 7 additions & 0 deletions components/gitpod-protocol/src/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
* See License.AGPL.txt in the project root for license information.
*/

import { HeadlessWorkspaceEventType } from "./headless-workspace-log";
import { PrebuiltWorkspaceState } from "./protocol";

export const WorkspaceInstanceUpdatesChannel = "chan:workspace-instances";
export const PrebuildUpdatesChannel = "chan:prebuilds";
export const HeadlessUpdatesChannel = "chan:headless";

export type RedisWorkspaceInstanceUpdate = {
ownerID: string;
Expand All @@ -21,3 +23,8 @@ export type RedisPrebuildUpdate = {
workspaceID: string;
projectID: string;
};

export type RedisHeadlessUpdate = {
workspaceID: string;
type: HeadlessWorkspaceEventType;
};
49 changes: 46 additions & 3 deletions components/server/src/messaging/redis-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@
import {
HeadlessWorkspaceEventListener,
LocalMessageBroker,
LocalRabbitMQBackedMessageBroker,
PrebuildUpdateListener,
WorkspaceInstanceUpdateListener,
} from "./local-message-broker";
import { inject, injectable } from "inversify";
import {
Disposable,
DisposableCollection,
HeadlessUpdatesChannel,
PrebuildUpdatesChannel,
PrebuildWithStatus,
RedisHeadlessUpdate,
RedisPrebuildUpdate,
RedisWorkspaceInstanceUpdate,
WorkspaceInstanceUpdatesChannel,
Expand All @@ -39,11 +42,12 @@ export class RedisSubscriber implements LocalMessageBroker {

protected workspaceInstanceUpdateListeners: Map<string, WorkspaceInstanceUpdateListener[]> = new Map();
protected prebuildUpdateListeners: Map<string, PrebuildUpdateListener[]> = new Map();
protected headlessWorkspaceEventListeners: Map<string, HeadlessWorkspaceEventListener[]> = new Map();

protected readonly disposables = new DisposableCollection();

async start(): Promise<void> {
const channels = [WorkspaceInstanceUpdatesChannel, PrebuildUpdatesChannel];
const channels = [WorkspaceInstanceUpdatesChannel, PrebuildUpdatesChannel, HeadlessUpdatesChannel];

for (const chan of channels) {
await this.redis.subscribe(chan);
Expand Down Expand Up @@ -91,6 +95,17 @@ export class RedisSubscriber implements LocalMessageBroker {
}
return this.onPrebuildUpdate(JSON.parse(message) as RedisPrebuildUpdate);

case HeadlessUpdatesChannel:
const headlessTypeEnabled = await this.isRedisPubSubByTypeEnabled("prebuild-updatable");
if (!headlessTypeEnabled) {
log.debug("[redis] Redis headless update is disabled through feature flag", {
channel,
message,
});
return;
}
return this.onHeadlessUpdate(JSON.parse(message) as RedisHeadlessUpdate);

default:
throw new Error(`Redis Pub/Sub received message on unknown channel: ${channel}`);
}
Expand Down Expand Up @@ -163,6 +178,29 @@ export class RedisSubscriber implements LocalMessageBroker {
}
}

private async onHeadlessUpdate(update: RedisHeadlessUpdate): Promise<void> {
log.debug("[redis] Received prebuild update", { update });

if (!update.type || !update.workspaceID) {
return;
}

const listeners =
this.headlessWorkspaceEventListeners.get(LocalRabbitMQBackedMessageBroker.UNDEFINED_KEY) || [];
if (listeners.length === 0) {
return;
}

const ctx = {};
for (const l of listeners) {
try {
l(ctx, update);
} catch (err) {
log.error("Failed to broadcast headless update.", update, err);
}
}
}

async stop() {
this.disposables.dispose();
}
Expand All @@ -172,8 +210,13 @@ export class RedisSubscriber implements LocalMessageBroker {
}

listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable {
// TODO: not implemented
return Disposable.create(() => {});
// we're being cheap here in re-using a map where it just needs to be a plain array.
return this.doRegister(
LocalRabbitMQBackedMessageBroker.UNDEFINED_KEY,
listener,
this.headlessWorkspaceEventListeners,
"prebuild-updatable",
);
}

listenForWorkspaceInstanceUpdates(userId: string, listener: WorkspaceInstanceUpdateListener): Disposable {
Expand Down
17 changes: 15 additions & 2 deletions components/server/src/redis/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import { inject, injectable } from "inversify";
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
import {
HeadlessUpdatesChannel,
PrebuildUpdatesChannel,
RedisHeadlessUpdate,
RedisPrebuildUpdate,
RedisWorkspaceInstanceUpdate,
WorkspaceInstanceUpdatesChannel,
Expand Down Expand Up @@ -52,8 +54,19 @@ export class RedisPublisher {
}
}

async publishHeadlessUpdate(): Promise<void> {
async publishHeadlessUpdate(update: RedisHeadlessUpdate): Promise<void> {
log.debug("[redis] Publish headless udpate invoked.");
reportUpdatePublished("headless");

let err: Error | undefined;
try {
const serialized = JSON.stringify(update);
await this.client.publish(HeadlessUpdatesChannel, serialized);
log.debug("[redis] Succesfully published headless update.", update);
} catch (e) {
err = e;
log.error("[redis] Failed to publish headless update.", e, update);
} finally {
reportUpdatePublished("headless", err);
}
}
}
5 changes: 4 additions & 1 deletion components/server/src/workspace/workspace-starter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,10 @@ export class WorkspaceStarter {
type: HeadlessWorkspaceEventType.Failed,
workspaceID: workspace.id, // required in prebuild-queue-maintainer.ts
});
await this.publisher.publishHeadlessUpdate();
await this.publisher.publishHeadlessUpdate({
type: HeadlessWorkspaceEventType.Failed,
workspaceID: workspace.id,
});
}
}
} catch (err) {
Expand Down
9 changes: 7 additions & 2 deletions components/ws-manager-bridge/src/prebuild-updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import { inject, injectable } from "inversify";
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
import { WorkspaceStatus, WorkspaceType } from "@gitpod/ws-manager/lib";
import { WorkspaceInstance } from "@gitpod/gitpod-protocol";
import { HeadlessWorkspaceEventType, WorkspaceInstance } from "@gitpod/gitpod-protocol";
import { log, LogContext } from "@gitpod/gitpod-protocol/lib/util/logging";
import { PrebuildStateMapper } from "./prebuild-state-mapper";
import { DBWithTracing, TracedWorkspaceDB } from "@gitpod/gitpod-db/lib/traced-db";
Expand Down Expand Up @@ -89,7 +89,12 @@ export class PrebuildUpdater {
type: update.type,
workspaceID: workspaceId,
});
await this.publisher.publishHeadlessUpdate();
if (!HeadlessWorkspaceEventType.isRunning(update.type)) {
await this.publisher.publishHeadlessUpdate({
type: update.type,
workspaceID: workspaceId,
});
}

// prebuild info
const info = (await this.workspaceDB.trace({ span }).findPrebuildInfos([updatedPrebuild.id]))[0];
Expand Down
17 changes: 15 additions & 2 deletions components/ws-manager-bridge/src/redis/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
import { Metrics } from "../metrics";
import { RedisClient } from "./client";
import {
HeadlessUpdatesChannel,
PrebuildUpdatesChannel,
RedisHeadlessUpdate,
RedisPrebuildUpdate,
RedisWorkspaceInstanceUpdate,
WorkspaceInstanceUpdatesChannel,
Expand Down Expand Up @@ -52,8 +54,19 @@ export class RedisPublisher {
}
}

async publishHeadlessUpdate(): Promise<void> {
async publishHeadlessUpdate(update: RedisHeadlessUpdate): Promise<void> {
log.debug("[redis] Publish headless udpate invoked.");
this.metrics.reportUpdatePublished("headless");

let err: Error | undefined;
try {
const serialized = JSON.stringify(update);
await this.client.get().publish(HeadlessUpdatesChannel, serialized);
log.debug("[redis] Succesfully published headless update.", update);
} catch (e) {
err = e;
log.error("[redis] Failed to publish headless update.", e, update);
} finally {
this.metrics.reportUpdatePublished("headless", err);
}
}
}

0 comments on commit 3400259

Please sign in to comment.