Skip to content

Commit

Permalink
[ENH]: Use new log service (chroma-core#1971)
Browse files Browse the repository at this point in the history
## Description of changes
*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Use refactored log service

## Test plan
*How are these changes tested?*
Tested with tilt
  • Loading branch information
nicolasgere authored Apr 5, 2024
1 parent cfc3a75 commit dab637f
Show file tree
Hide file tree
Showing 29 changed files with 197 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
**/.DS_Store

**/__pycache__

go/bin/
go/**/testdata/
go/coordinator/bin/

Expand Down
30 changes: 28 additions & 2 deletions Tiltfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,42 @@
update_settings(max_parallel_updates=6)

docker_build(
'local:postgres',
context='./k8s/test/postgres',
dockerfile='./k8s/test/postgres/Dockerfile'
)

docker_build(
'local:log-service',
'.',
only=['go/'],
dockerfile='./go/Dockerfile',
target='logservice'
)


docker_build(
'local:sysdb-migration',
'.',
only=['go/'],
dockerfile='./go/Dockerfile.migration'
dockerfile='./go/Dockerfile.migration',
target='sysdb-migration'
)

docker_build(
'local:logservice-migration',
'.',
only=['go/'],
dockerfile='./go/Dockerfile.migration',
target="logservice-migration"
)

docker_build(
'local:sysdb',
'.',
only=['go/', 'idl/'],
dockerfile='./go/Dockerfile'
dockerfile='./go/Dockerfile',
target='sysdb'
)

docker_build(
Expand Down Expand Up @@ -107,6 +132,7 @@ k8s_resource(
k8s_resource('postgres', resource_deps=['k8s_setup', 'namespace'], labels=["infrastructure"])
k8s_resource('pulsar', resource_deps=['k8s_setup', 'namespace'], labels=["infrastructure"], port_forwards=['6650:6650', '8080:8080'])
k8s_resource('sysdb-migration', resource_deps=['postgres', 'namespace'], labels=["infrastructure"])
k8s_resource('logservice-migration', resource_deps=['postgres', 'namespace'], labels=["infrastructure"])
k8s_resource('logservice', resource_deps=['sysdb-migration'], labels=["chroma"], port_forwards='50052:50051')
k8s_resource('sysdb', resource_deps=['pulsar', 'sysdb-migration'], labels=["chroma"], port_forwards='50051:50051')
k8s_resource('frontend-service', resource_deps=['pulsar', 'sysdb', 'logservice'],labels=["chroma"], port_forwards='8000:8000')
Expand Down
4 changes: 2 additions & 2 deletions chromadb/logservice/logservice.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sys

import grpc

import time
from chromadb.ingest import (
Producer,
Consumer,
Expand Down Expand Up @@ -154,7 +154,7 @@ def pull_logs(
collection_id=str(collection_id),
start_from_offset=start_offset,
batch_size=batch_size,
end_timestamp=-1,
end_timestamp=time.time_ns(),
)
response = self._log_service_stub.PullLogs(request)
return response.records # type: ignore
11 changes: 8 additions & 3 deletions go/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ ADD ./go/ ./
ENV GOCACHE=/root/.cache/go-build
RUN --mount=type=cache,target="/root/.cache/go-build" make

FROM debian:bookworm-slim

FROM debian:bookworm-slim as sysdb
COPY --from=builder /build-dir/bin/coordinator .
COPY --from=builder /build-dir/bin/logservice .
ENV PATH=$PATH:./

CMD /bin/bash


FROM debian:bookworm-slim as logservice
WORKDIR /app
COPY --from=builder /build-dir/bin/logservice .
CMD ["./logservice"]

10 changes: 9 additions & 1 deletion go/Dockerfile.migration
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM debian:bookworm-slim
FROM debian:bookworm-slim as sysdb-migration

RUN apt update
RUN apt upgrade -y
Expand All @@ -7,3 +7,11 @@ RUN curl -sSf https://atlasgo.sh | sh -s -- --community

COPY ./go/migrations migrations
COPY ./go/atlas.hcl atlas.hcl

FROM debian:bookworm-slim as logservice-migration
RUN apt update
RUN apt upgrade -y
RUN apt install -y curl jq
RUN curl -sSf https://atlasgo.sh | sh -s -- --community
COPY ./go/database/log/migrations migrations
COPY ./go/database/log/atlas.hcl atlas.hcl
2 changes: 1 addition & 1 deletion go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ clean:
docker:
docker build -t chroma-coordinator:latest .


### LOG SERVICE
DATABABASE_LOG_DIR := database/log

log_db_clean:
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/coordinator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func init() {
Cmd.Flags().StringVar(&conf.DBConfig.Password, "password", "chroma", "MetaTable password")
Cmd.Flags().StringVar(&conf.DBConfig.Address, "db-address", "postgres", "MetaTable db address")
Cmd.Flags().IntVar(&conf.DBConfig.Port, "db-port", 5432, "MetaTable db port")
Cmd.Flags().StringVar(&conf.DBConfig.DBName, "db-name", "chroma", "MetaTable db name")
Cmd.Flags().StringVar(&conf.DBConfig.DBName, "db-name", "sysdb", "MetaTable db name")
Cmd.Flags().IntVar(&conf.DBConfig.MaxIdleConns, "max-idle-conns", 10, "MetaTable max idle connections")
Cmd.Flags().IntVar(&conf.DBConfig.MaxOpenConns, "max-open-conns", 10, "MetaTable max open connections")
Cmd.Flags().StringVar(&conf.DBConfig.SslMode, "ssl-mode", "disable", "SSL mode for database connection")
Expand Down
46 changes: 0 additions & 46 deletions go/cmd/logservice/cmd.go

This file was deleted.

52 changes: 31 additions & 21 deletions go/cmd/logservice/main.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,46 @@
package main

import (
"fmt"
"os"

"context"
"github.com/chroma-core/chroma/go/pkg/log/configuration"
"github.com/chroma-core/chroma/go/pkg/log/repository"
"github.com/chroma-core/chroma/go/pkg/log/server"
"github.com/chroma-core/chroma/go/pkg/proto/logservicepb"
"github.com/chroma-core/chroma/go/pkg/utils"
libs "github.com/chroma-core/chroma/go/shared/libs"
"github.com/pingcap/log"
"github.com/rs/zerolog"
"github.com/spf13/cobra"
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
"google.golang.org/grpc"
"net"
)

var (
rootCmd = &cobra.Command{
Use: "logservice",
Short: "RecordLog root command",
Long: `RecordLog root command`,
}
)

func init() {
rootCmd.AddCommand(Cmd)
}

func main() {
ctx := context.Background()
// Configure logger
utils.LogLevel = zerolog.DebugLevel
utils.ConfigureLogger()
if _, err := maxprocs.Set(); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
log.Fatal("can't set maxprocs", zap.Error(err))
}
log.Info("Starting log service")
config := configuration.NewLogServiceConfiguration()
conn, err := libs.NewPgConnection(ctx, config)
if err != nil {
log.Fatal("failed to connect to postgres", zap.Error(err))
}
lr := repository.NewLogRepository(conn)
server := server.NewLogServer(lr)
var listener net.Listener
listener, err = net.Listen("tcp", ":"+config.PORT)
if err != nil {
log.Fatal("failed to listen", zap.Error(err))
}
if err := rootCmd.Execute(); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
s := grpc.NewServer()
logservicepb.RegisterLogServiceServer(s, server)
log.Info("log service started", zap.String("address", listener.Addr().String()))
if err := s.Serve(listener); err != nil {
log.Fatal("failed to serve", zap.Error(err))
}
}
3 changes: 2 additions & 1 deletion go/database/log/db/copyfrom.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/database/log/db/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions go/database/log/db/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CREATE TABLE "public"."collection" (
CREATE TABLE "public"."record_log" (
"offset" bigint NOT NULL,
"collection_id" text NOT NULL,
"timestamp" integer NOT NULL DEFAULT (EXTRACT(epoch FROM now()))::integer,
"timestamp" bigint NOT NULL,
"record" bytea NOT NULL,
PRIMARY KEY ("collection_id", "offset")
);
4 changes: 2 additions & 2 deletions go/database/log/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
h1:l718NRul/xO5Vz4nKzlWAR9ML+kOkn4TTgIlMQYcUZA=
20240401221053_initial.sql h1:RPywT3bZIeCHgfStvajW3fcDhqadDY5xI9MFjE/Un4U=
h1:kG+ejV1DS3youx+m5SNNFYabJeDqfYTdSQHbJtR2/eU=
20240404181827_initial.sql h1:xnoD1FcXImqQPJOvaDbTOwTGPLtCP3RibetuaaZeATI=
2 changes: 1 addition & 1 deletion go/database/log/queries/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ WHERE id = $1
FOR UPDATE;

-- name: InsertRecord :copyfrom
INSERT INTO record_log (collection_id, "offset", record) values($1, $2, $3);
INSERT INTO record_log (collection_id, "offset", record, timestamp) values($1, $2, $3, $4);

-- name: GetRecordsForCollection :many
SELECT * FROM record_log r WHERE r.collection_id = $1 AND r.offset >= $2 and r.timestamp <= $4 ORDER BY r.offset ASC limit $3 ;
Expand Down
2 changes: 1 addition & 1 deletion go/database/log/schema/record_log.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CREATE TABLE record_log (
"offset" BIGINT NOT NULL,
collection_id text NOT NULL,
timestamp int NOT NULL default extract(epoch from now())::int,
timestamp BIGINT NOT NULL,
record bytea NOT NULL,
PRIMARY KEY(collection_id, "offset")
);
Expand Down
23 changes: 23 additions & 0 deletions go/pkg/log/configuration/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package configuration

import "os"

type LogServiceConfiguration struct {
PORT string
DATABASE_URL string
}

func getEnvWithDefault(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value
}

func NewLogServiceConfiguration() *LogServiceConfiguration {
return &LogServiceConfiguration{
PORT: getEnvWithDefault("PORT", "50051"),
DATABASE_URL: getEnvWithDefault("CHROMA_DATABASE_URL", "postgresql://chroma:[email protected]:5432/log"),
}
}
9 changes: 7 additions & 2 deletions go/pkg/log/repository/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
log "github.com/chroma-core/chroma/go/database/log/db"
"github.com/jackc/pgx/v5"
"time"
)

type LogRepository struct {
Expand All @@ -15,6 +16,9 @@ type LogRepository struct {
func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string, records [][]byte) (insertCount int64, err error) {
var tx pgx.Tx
tx, err = r.conn.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return
}
var collection log.Collection
queriesWithTx := r.queries.WithTx(tx)
defer func() {
Expand Down Expand Up @@ -47,6 +51,7 @@ func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string,
CollectionID: collectionId,
Record: record,
Offset: offset,
Timestamp: time.Now().UnixNano(),
}
}
insertCount, err = queriesWithTx.InsertRecord(ctx, params)
Expand All @@ -60,12 +65,12 @@ func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string,
return
}

func (r *LogRepository) PullRecords(ctx context.Context, collectionId string, offset int64, batchSize int, timestamp int) (records []log.RecordLog, err error) {
func (r *LogRepository) PullRecords(ctx context.Context, collectionId string, offset int64, batchSize int, timestamp int64) (records []log.RecordLog, err error) {
records, err = r.queries.GetRecordsForCollection(ctx, log.GetRecordsForCollectionParams{
CollectionID: collectionId,
Offset: offset,
Limit: int32(batchSize),
Timestamp: int32(timestamp),
Timestamp: timestamp,
})
return
}
Expand Down
Loading

0 comments on commit dab637f

Please sign in to comment.