Skip to content

Commit

Permalink
chore(server): refactor locks (immich-app#5953)
Browse files Browse the repository at this point in the history
* lock refactor

* add mocks

* add await

* move database repo injection to service

* update tests

* add mock implementation

* remove unused imports

* this
  • Loading branch information
mertalev authored Dec 27, 2023
1 parent 1af27fc commit 8119d4b
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 72 deletions.
13 changes: 9 additions & 4 deletions server/src/domain/metadata/metadata.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
newAssetRepositoryMock,
newCommunicationRepositoryMock,
newCryptoRepositoryMock,
newDatabaseRepositoryMock,
newJobRepositoryMock,
newMediaRepositoryMock,
newMetadataRepositoryMock,
Expand All @@ -25,6 +26,7 @@ import {
IAssetRepository,
ICommunicationRepository,
ICryptoRepository,
IDatabaseRepository,
IJobRepository,
IMediaRepository,
IMetadataRepository,
Expand All @@ -50,6 +52,7 @@ describe(MetadataService.name, () => {
let personMock: jest.Mocked<IPersonRepository>;
let storageMock: jest.Mocked<IStorageRepository>;
let communicationMock: jest.Mocked<ICommunicationRepository>;
let databaseMock: jest.Mocked<IDatabaseRepository>;
let sut: MetadataService;

beforeEach(async () => {
Expand All @@ -64,19 +67,21 @@ describe(MetadataService.name, () => {
communicationMock = newCommunicationRepositoryMock();
storageMock = newStorageRepositoryMock();
mediaMock = newMediaRepositoryMock();
databaseMock = newDatabaseRepositoryMock();

sut = new MetadataService(
albumMock,
assetMock,
communicationMock,
cryptoRepository,
databaseMock,
jobMock,
metadataMock,
storageMock,
configMock,
mediaMock,
metadataMock,
moveMock,
communicationMock,
personMock,
storageMock,
configMock,
);
});

Expand Down
13 changes: 8 additions & 5 deletions server/src/domain/metadata/metadata.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import { usePagination } from '../domain.util';
import { IBaseJob, IEntityJob, ISidecarWriteJob, JOBS_ASSET_PAGINATION_SIZE, JobName, QueueName } from '../job';
import {
ClientEvent,
DatabaseLock,
ExifDuration,
IAlbumRepository,
IAssetRepository,
ICommunicationRepository,
ICryptoRepository,
IDatabaseRepository,
IJobRepository,
IMediaRepository,
IMetadataRepository,
Expand Down Expand Up @@ -100,15 +102,16 @@ export class MetadataService {
constructor(
@Inject(IAlbumRepository) private albumRepository: IAlbumRepository,
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
@Inject(ICryptoRepository) private cryptoRepository: ICryptoRepository,
@Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
@Inject(IMetadataRepository) private repository: IMetadataRepository,
@Inject(IStorageRepository) private storageRepository: IStorageRepository,
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
@Inject(IMediaRepository) private mediaRepository: IMediaRepository,
@Inject(IMetadataRepository) private repository: IMetadataRepository,
@Inject(IMoveRepository) moveRepository: IMoveRepository,
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
@Inject(IPersonRepository) personRepository: IPersonRepository,
@Inject(IStorageRepository) private storageRepository: IStorageRepository,
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
) {
this.configCore = SystemConfigCore.create(configRepository);
this.storageCore = StorageCore.create(assetRepository, moveRepository, personRepository, storageRepository);
Expand All @@ -128,7 +131,7 @@ export class MetadataService {

try {
await this.jobRepository.pause(QueueName.METADATA_EXTRACTION);
await this.repository.init();
await this.databaseRepository.withLock(DatabaseLock.GeodataImport, () => this.repository.init());
await this.jobRepository.resume(QueueName.METADATA_EXTRACTION);

this.logger.log(`Initialized local reverse geocoder`);
Expand Down
8 changes: 8 additions & 0 deletions server/src/domain/repositories/database.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@ export enum DatabaseExtension {
VECTORS = 'vectors',
}

export enum DatabaseLock {
GeodataImport = 100,
CLIPDimSize = 512,
}

export const IDatabaseRepository = 'IDatabaseRepository';

export interface IDatabaseRepository {
getExtensionVersion(extName: string): Promise<Version | null>;
getPostgresVersion(): Promise<Version>;
createExtension(extension: DatabaseExtension): Promise<void>;
runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise<void>;
withLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R>;
isBusy(lock: DatabaseLock): boolean;
wait(lock: DatabaseLock): Promise<void>;
}
6 changes: 5 additions & 1 deletion server/src/domain/smart-info/smart-info.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { AssetEntity, SystemConfigKey } from '@app/infra/entities';
import {
assetStub,
newAssetRepositoryMock,
newDatabaseRepositoryMock,
newJobRepositoryMock,
newMachineLearningRepositoryMock,
newSmartInfoRepositoryMock,
Expand All @@ -10,6 +11,7 @@ import {
import { JobName } from '../job';
import {
IAssetRepository,
IDatabaseRepository,
IJobRepository,
IMachineLearningRepository,
ISmartInfoRepository,
Expand All @@ -31,14 +33,16 @@ describe(SmartInfoService.name, () => {
let jobMock: jest.Mocked<IJobRepository>;
let smartMock: jest.Mocked<ISmartInfoRepository>;
let machineMock: jest.Mocked<IMachineLearningRepository>;
let databaseMock: jest.Mocked<IDatabaseRepository>;

beforeEach(async () => {
assetMock = newAssetRepositoryMock();
configMock = newSystemConfigRepositoryMock();
smartMock = newSmartInfoRepositoryMock();
jobMock = newJobRepositoryMock();
machineMock = newMachineLearningRepositoryMock();
sut = new SmartInfoService(assetMock, configMock, jobMock, smartMock, machineMock);
databaseMock = newDatabaseRepositoryMock();
sut = new SmartInfoService(assetMock, databaseMock, jobMock, machineMock, smartMock, configMock);

assetMock.getByIds.mockResolvedValue([asset]);
});
Expand Down
16 changes: 13 additions & 3 deletions server/src/domain/smart-info/smart-info.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import { setTimeout } from 'timers/promises';
import { usePagination } from '../domain.util';
import { IBaseJob, IEntityJob, JOBS_ASSET_PAGINATION_SIZE, JobName, QueueName } from '../job';
import {
DatabaseLock,
IAssetRepository,
IDatabaseRepository,
IJobRepository,
IMachineLearningRepository,
ISmartInfoRepository,
Expand All @@ -20,10 +22,11 @@ export class SmartInfoService {

constructor(
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
@Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
@Inject(ISmartInfoRepository) private repository: ISmartInfoRepository,
@Inject(IMachineLearningRepository) private machineLearning: IMachineLearningRepository,
@Inject(ISmartInfoRepository) private repository: ISmartInfoRepository,
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
) {
this.configCore = SystemConfigCore.create(configRepository);
}
Expand All @@ -41,7 +44,9 @@ export class SmartInfoService {

const { machineLearning } = await this.configCore.getConfig();

await this.repository.init(machineLearning.clip.modelName);
await this.databaseRepository.withLock(DatabaseLock.CLIPDimSize, () =>
this.repository.init(machineLearning.clip.modelName),
);

await this.jobRepository.resume(QueueName.SMART_SEARCH);
}
Expand Down Expand Up @@ -84,6 +89,11 @@ export class SmartInfoService {
machineLearning.clip,
);

if (this.databaseRepository.isBusy(DatabaseLock.CLIPDimSize)) {
this.logger.verbose(`Waiting for CLIP dimension size to be updated`);
await this.databaseRepository.wait(DatabaseLock.CLIPDimSize);
}

await this.repository.upsert({ assetId: asset.id }, clipEmbedding);

return true;
Expand Down
41 changes: 0 additions & 41 deletions server/src/infra/database-locks.ts

This file was deleted.

1 change: 0 additions & 1 deletion server/src/infra/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export * from './database-locks';
export * from './database.config';
export * from './infra.config';
export * from './infra.module';
Expand Down
35 changes: 34 additions & 1 deletion server/src/infra/repositories/database.repository.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { DatabaseExtension, IDatabaseRepository, Version } from '@app/domain';
import { DatabaseExtension, DatabaseLock, IDatabaseRepository, Version } from '@app/domain';
import { Injectable } from '@nestjs/common';
import { InjectDataSource } from '@nestjs/typeorm';
import AsyncLock from 'async-lock';
import { DataSource } from 'typeorm';

@Injectable()
export class DatabaseRepository implements IDatabaseRepository {
readonly asyncLock = new AsyncLock();

constructor(@InjectDataSource() private dataSource: DataSource) {}

async getExtensionVersion(extension: DatabaseExtension): Promise<Version | null> {
Expand All @@ -25,4 +28,34 @@ export class DatabaseRepository implements IDatabaseRepository {
async runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise<void> {
await this.dataSource.runMigrations(options);
}

async withLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R> {
let res;
await this.asyncLock.acquire(DatabaseLock[lock], async () => {
try {
await this.acquireLock(lock);
res = await callback();
} finally {
await this.releaseLock(lock);
}
});

return res as R;
}

isBusy(lock: DatabaseLock): boolean {
return this.asyncLock.isBusy(DatabaseLock[lock]);
}

async wait(lock: DatabaseLock): Promise<void> {
await this.asyncLock.acquire(DatabaseLock[lock], () => {});
}

private async acquireLock(lock: DatabaseLock): Promise<void> {
return this.dataSource.query('SELECT pg_advisory_lock($1)', [lock]);
}

private async releaseLock(lock: DatabaseLock): Promise<void> {
return this.dataSource.query('SELECT pg_advisory_unlock($1)', [lock]);
}
}
19 changes: 10 additions & 9 deletions server/src/infra/repositories/metadata.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
ISystemMetadataRepository,
ReverseGeocodeResult,
} from '@app/domain';
import { DatabaseLock, RequireLock } from '@app/infra';
import { GeodataAdmin1Entity, GeodataAdmin2Entity, GeodataPlacesEntity, SystemMetadataKey } from '@app/infra/entities';
import { ImmichLogger } from '@app/infra/logger';
import { Inject } from '@nestjs/common';
Expand Down Expand Up @@ -34,7 +33,6 @@ export class MetadataRepository implements IMetadataRepository {

private logger = new ImmichLogger(MetadataRepository.name);

@RequireLock(DatabaseLock.GeodataImport)
async init(): Promise<void> {
this.logger.log('Initializing metadata repository');
const geodataDate = await readFile('/usr/src/resources/geodata-date.txt', 'utf8');
Expand All @@ -46,7 +44,17 @@ export class MetadataRepository implements IMetadataRepository {
}

this.logger.log('Importing geodata to database from file');
await this.importGeodata();

await this.systemMetadataRepository.set(SystemMetadataKey.REVERSE_GEOCODING_STATE, {
lastUpdate: geodataDate,
lastImportFileName: CITIES_FILE,
});

this.logger.log('Geodata import completed');
}

private async importGeodata() {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();

Expand All @@ -65,13 +73,6 @@ export class MetadataRepository implements IMetadataRepository {
} finally {
await queryRunner.release();
}

await this.systemMetadataRepository.set(SystemMetadataKey.REVERSE_GEOCODING_STATE, {
lastUpdate: geodataDate,
lastImportFileName: CITIES_FILE,
});

this.logger.log('Geodata import completed');
}

private async loadGeodataToTableFromFile<T extends GeoEntity>(
Expand Down
7 changes: 0 additions & 7 deletions server/src/infra/repositories/smart-info.repository.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Embedding, EmbeddingSearch, ISmartInfoRepository } from '@app/domain';
import { getCLIPModelInfo } from '@app/domain/smart-info/smart-info.constant';
import { DatabaseLock, RequireLock, asyncLock } from '@app/infra';
import { AssetEntity, AssetFaceEntity, SmartInfoEntity, SmartSearchEntity } from '@app/infra/entities';
import { ImmichLogger } from '@app/infra/logger';
import { Injectable } from '@nestjs/common';
Expand Down Expand Up @@ -121,18 +120,12 @@ export class SmartInfoRepository implements ISmartInfoRepository {
}

private async upsertEmbedding(assetId: string, embedding: number[]): Promise<void> {
if (asyncLock.isBusy(DatabaseLock[DatabaseLock.CLIPDimSize])) {
this.logger.verbose(`Waiting for CLIP dimension size to be updated`);
await asyncLock.acquire(DatabaseLock[DatabaseLock.CLIPDimSize], () => {});
}

await this.smartSearchRepository.upsert(
{ assetId, embedding: () => asVector(embedding, true) },
{ conflictPaths: ['assetId'] },
);
}

@RequireLock(DatabaseLock.CLIPDimSize)
private async updateDimSize(dimSize: number): Promise<void> {
if (!isValidInteger(dimSize, { min: 1, max: 2 ** 16 })) {
throw new Error(`Invalid CLIP dimension size: ${dimSize}`);
Expand Down
3 changes: 3 additions & 0 deletions server/test/repositories/database.repository.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ export const newDatabaseRepositoryMock = (): jest.Mocked<IDatabaseRepository> =>
getPostgresVersion: jest.fn().mockResolvedValue(new Version(14, 0, 0)),
createExtension: jest.fn().mockImplementation(() => Promise.resolve()),
runMigrations: jest.fn(),
withLock: jest.fn().mockImplementation((_, func: <R>() => Promise<R>) => func()),
isBusy: jest.fn(),
wait: jest.fn(),
};
};

0 comments on commit 8119d4b

Please sign in to comment.