Skip to content

Commit

Permalink
StarRocks#5463 Add dbt-connector for StarRocks (StarRocks#13190)
Browse files Browse the repository at this point in the history
  • Loading branch information
motto1314 authored Nov 9, 2022
1 parent c8a6543 commit d17f0df
Show file tree
Hide file tree
Showing 27 changed files with 1,116 additions and 0 deletions.
3 changes: 3 additions & 0 deletions contrib/dbt-connector/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc.

recursive-include dbt/include *.sql *.yml *.md
65 changes: 65 additions & 0 deletions contrib/dbt-connector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# dbt-starrocks

This project is **under development**.


The `dbt-starrocks` package contains all of the code enabling dbt to work with a StarRocks database. For
more information on using dbt with StarRocks.

## Getting started
Configuration your envs:

- Python: 3.7.4
- StarRocks: 2.4.0+
- DBT: 1.1.0

Install the `dbt-starrocks` into the `plugin` directory, and
```
pip install .
```

Create your project:
```
dbt init
```

## Basic Example
### dbt seed properties(yml):
#### Minimum configuration:
```
config:
distributed_by: ['id']
```

#### Complete configuration:
```
config:
engine: 'OLAP'
keys: ['id', 'name', 'some_date']
table_type: 'PRIMARY' //PRIMARY or DUPLICATE or UNIQUE
distributed_by: ['id']
buckets: 3 //default 10
partition_by: ['some_date']
partition_by_init: ["PARTITION p1 VALUES [('1971-01-01 00:00:00'), ('1991-01-01 00:00:00')),PARTITION p1972 VALUES [('1991-01-01 00:00:00'), ('1999-01-01 00:00:00'))"]
properties: {"replication_num":"1", "in_memory": "true"}
```

### dbt run config(table/incremental):
#### Minimum configuration:
```
{{ config(materialized=var("materialized_var", "table"), distributed_by=['id'])}}
{{ config(materialized='incremental', distributed_by=['id']) }}
```

#### Complete configuration:
```
{{ config(materialized='table', engine='OLAP', buckets=32, distributed_by=['id'], properties={"in_memory": "true"}) }}
{{ config(materialized='incremental', engine='OLAP', buckets=32, distributed_by=['id'], properties={"in_memory": "true"}) }}
```

## Test Adapter
consult [the project](https://github.com/dbt-labs/dbt-adapter-tests)

## Notice
1. `Create table as` can only set engine='OLAP' and table_type='DUPLICATE'
2. distributed_by is must
14 changes: 14 additions & 0 deletions contrib/dbt-connector/dbt/adapters/starrocks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#! /usr/bin/python3
# This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc.

from dbt.adapters.starrocks.connections import StarRocksCredentials
from dbt.adapters.starrocks.impl import StarRocksAdapter

from dbt.adapters.base import AdapterPlugin
from dbt.include import starrocks


Plugin = AdapterPlugin(
adapter=StarRocksAdapter,
credentials=StarRocksCredentials,
include_path=starrocks.PACKAGE_PATH)
4 changes: 4 additions & 0 deletions contrib/dbt-connector/dbt/adapters/starrocks/__version__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#! /usr/bin/python3
# This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc.

version = "1.0.0"
53 changes: 53 additions & 0 deletions contrib/dbt-connector/dbt/adapters/starrocks/column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#! /usr/bin/python3
# This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc.

from dataclasses import dataclass

from dbt.adapters.base.column import Column

@dataclass
class StarRocksColumn(Column):
@property
def quoted(self) -> str:
return "`{}`".format(self.column)

def __repr__(self) -> str:
return f"<StarRocksColumn {self.name} ({self.data_type})>"

def is_string(self) -> bool:
return self.dtype.lower() in ["text", "character varying", "character", "varchar",
# starrocks
"char", "string"]

def is_float(self):
return self.dtype.lower() in [
# floats
"real",
"float4",
"float",
"double precision",
"float8",
# starrocks
"double",
]

def is_integer(self) -> bool:
return self.dtype.lower() in [
# real types
"smallint",
"integer",
"bigint",
"smallserial",
"serial",
"bigserial",
# aliases
"int2",
"int4",
"int8",
"serial2",
"serial4",
"serial8",
# starrocks
"largeint",
"tinyint",
]
163 changes: 163 additions & 0 deletions contrib/dbt-connector/dbt/adapters/starrocks/connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#! /usr/bin/python3
# This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc.

from contextlib import contextmanager

import mysql.connector

import dbt.exceptions
from dataclasses import dataclass

from dbt.adapters.base import Credentials
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.connection import Connection
from dbt.events import AdapterLogger
from typing import Optional

logger = AdapterLogger("starrocks")

@dataclass
class StarRocksCredentials(Credentials):
host: Optional[str] = None
port: Optional[int] = None
database: Optional[str] = None
schema: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
charset: Optional[str] = None

def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)

def __post_init__(self):
# starrocks classifies database and schema as the same thing
if (
self.database is not None and
self.database != self.schema
):
raise dbt.exceptions.RuntimeException(
f" schema: {self.schema} \n"
f" database: {self.database} \n"
f"On StarRocks, database must be omitted or have the same value as"
f" schema."
)

@property
def type(self):
return 'starrocks'

@property
def unique_field(self):
return self.schema

def _connection_keys(self):
"""
Returns an iterator of keys to pretty-print in 'dbt debug'
"""
return (
"host",
"port",
"database",
"schema",
"username",
)

class StarRocksConnectionManager(SQLConnectionManager):
TYPE = 'starrocks'

@classmethod
def open(cls, connection):
if connection.state == 'open':
logger.debug('Connection is already open, skipping open.')
return connection

credentials = cls.get_credentials(connection.credentials)
kwargs = {}

kwargs["host"] = credentials.host
kwargs["username"] = credentials.username
kwargs["password"] = credentials.password
kwargs["database"] = credentials.schema

if credentials.port:
kwargs["port"] = credentials.port

try:
connection.handle = mysql.connector.connect(**kwargs)
connection.state = 'open'
except mysql.connector.Error:

try:
logger.debug("Failed connection without supplying the `database`. "
"Trying again with `database` included.")

# Try again with the database included
kwargs["database"] = "information_schema"

connection.handle = mysql.connector.connect(**kwargs)
connection.state = 'open'
except mysql.connector.Error as e:

logger.debug("Got an error when attempting to open a StarRocks "
"connection: '{}'"
.format(e))

connection.handle = None
connection.state = 'fail'

raise dbt.exceptions.FailedToConnectException(str(e))

return connection

@classmethod
def get_credentials(cls, credentials):
return credentials

def cancel(self, connection: Connection):
connection.handle.close()

@contextmanager
def exception_handler(self, sql):
try:
yield

except mysql.connector.DatabaseError as e:
logger.debug('StarRocks error: {}'.format(str(e)))

try:
self.rollback_if_open()
except mysql.connector.Error:
logger.debug("Failed to release connection!")
pass

raise dbt.exceptions.DatabaseException(str(e).strip()) from e

except Exception as e:
logger.debug("Error running SQL: {}", sql)
logger.debug("Rolling back transaction.")
self.rollback_if_open()
if isinstance(e, dbt.exceptions.RuntimeException):
# during a sql query, an internal to dbt exception was raised.
# this sounds a lot like a signal handler and probably has
# useful information, so raise it without modification.
raise

raise dbt.exceptions.RuntimeException(e) from e

@classmethod
def get_response(cls, cursor) -> AdapterResponse:
code = "SUCCESS"
num_rows = 0

if cursor is not None and cursor.rowcount is not None:
num_rows = cursor.rowcount

# There's no real way to get the status from the mysql-connector-python driver.
# So just return the default value.
return AdapterResponse(
_message="{} {}".format(code, num_rows),
rows_affected=num_rows,
code=code
)
Loading

0 comments on commit d17f0df

Please sign in to comment.