Skip to content

Commit

Permalink
WIP: Postgres conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
mkouhia committed Jan 2, 2025
1 parent 57b0e6d commit 727b481
Show file tree
Hide file tree
Showing 15 changed files with 1,177 additions and 821 deletions.
759 changes: 723 additions & 36 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ edition = "2021"

[dependencies]
rumqttc = { version = "0.24.0", features = ["use-rustls", "websocket"] }
rusqlite = { version = "0.32.1", features = ["bundled"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand All @@ -15,6 +14,8 @@ thiserror = "2.0.7"
config = "0.15.4"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
serde_with = "3.11.0"
sqlx = { version = "0.8.2", features = ["runtime-tokio", "postgres", "chrono", "macros", "migrate", "tls-rustls-ring"] }

[dev-dependencies]
dotenvy = "0.15.7"
tempfile = "3.3"
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,34 @@

- `RUST_LOG` determines log level.
- Use `RUST_LOG='ais_recorder=trace'` for more verbose output

## Database

# only for postgres
$ cargo install sqlx-cli --no-default-features --features postgres

Provide the database URL in the `DATABASE_URL` environment variable.

export DATABASE_URL=postgres://username:password@localhost/ais-recorder

Create the database schema:

sqlx database create
sqlx migrate run

Create and run migrations:

sqlx migrate add <name>>
sqlx migrate run

## Development environment

### PostgreSQL

1. Run the TimescaleDB Docker image

docker-compose up -d

2. Connect to a database using the `psql` executable in the Docker container:

docker exec -it db psql -U ais-recorder
24 changes: 2 additions & 22 deletions config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,5 @@ topics = ["vessels-v2/+/location", "vessels-v2/+/metadata"]
client_id = "ais-logger-client"

[database]
# Path to the database file
path = "./ais-recorder.db"
# Maximum number of messages to batch before writing
batch_size = 1000
# Interval (in seconds) to flush database even if batch is not full
flush_interval = 10

[export]
# Data export interval, in Croner expression
# https://github.com/Hexagon/croner-rust
#
# +---------------- second (0 - 59)
# | +-------------- minute (0 - 59)
# | | +------------ hour (0 - 23)
# | | | +---------- day of month (1 - 31)
# | | | | +-------- month (1 - 12, JAN-DEC)
# | | | | | +------ day of week (0 - 6, SUN-Mon)
# | | | | | | (0 to 6 are Sunday to Saturday; 7 is Sunday, the same as 0)
# | | | | | |
cron = "0 5 4 * * *"
# Output folder
directory = "./data"
# URL to the database
url = "postgres://ais-recorder:sika-limppu-naulakkoon-kastehelmi-farsiksi@localhost/ais-recorder"
24 changes: 24 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: "3.9"

services:
db:
container_name: db
image: timescale/timescaledb:latest-pg17
env_file: ".env"
restart: always
# set shared memory limit when using docker-compose
shm_size: 128mb
# or set shared memory limit when deploy via swarm stack
#volumes:
# - type: tmpfs
# target: /dev/shm
# tmpfs:
# size: 134217728 # 128*2^20 bytes = 128Mb
ports:
- "127.0.0.1:5432:5432/tcp"

# adminer:
# image: adminer
# restart: always
# ports:
# - 8080:8080
69 changes: 69 additions & 0 deletions migrations/20250101063544_initial-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
-- Create tables for AIS data
CREATE TABLE locations (
mmsi INTEGER NOT NULL, -- Raw MMSI (max 9 digits)
time TIMESTAMPTZ NOT NULL, -- Note: microseconds (original data has time/seconds)
sog REAL, -- Speed over ground in knots
cog REAL, -- Course over ground in degrees
nav_stat SMALLINT, -- Raw navigation status (0-15)
rot SMALLINT, -- Raw ROT value (-128 to 127)
pos_acc BOOLEAN NOT NULL, -- Position accuracy flag
raim BOOLEAN NOT NULL, -- RAIM flag
heading SMALLINT, -- Raw heading (0-359, 511)
lon DOUBLE PRECISION NOT NULL, -- Longitude in decimal degrees
lat DOUBLE PRECISION NOT NULL, -- Latitude in decimal degrees
PRIMARY KEY (mmsi, time)
);

CREATE TABLE metadata (
mmsi INTEGER NOT NULL, -- Raw MMSI
time TIMESTAMPTZ NOT NULL, -- Unix timestamp in milliseconds
name VARCHAR(20), -- Vessel name (max 20 chars)
destination VARCHAR(20), -- Destination (max 20 chars)
vessel_type SMALLINT, -- Raw vessel type
call_sign VARCHAR(7), -- Call sign (max 7 chars)
imo INTEGER, -- IMO number (7 digits)
draught SMALLINT, -- Raw draught value (0-255)
eta INTEGER, -- Raw ETA bits (20-bit packed format)
pos_type SMALLINT, -- Raw position fixing device type
ref_a SMALLINT, -- Raw dimension A
ref_b SMALLINT, -- Raw dimension B
ref_c SMALLINT, -- Raw dimension C
ref_d SMALLINT, -- Raw dimension D
PRIMARY KEY (mmsi, time)
);

-- Convert tables to hypertables
SELECT
create_hypertable ('locations', by_range ('time', INTERVAL '1 day'));

SELECT
create_hypertable ('metadata', by_range ('time', INTERVAL '1 day'));

-- Enable compression
ALTER TABLE locations
SET
(
timescaledb.compress,
timescaledb.compress_orderby = 'time DESC',
timescaledb.compress_segmentby = 'mmsi'
);

ALTER TABLE metadata
SET
(
timescaledb.compress,
timescaledb.compress_orderby = 'time DESC',
timescaledb.compress_segmentby = 'mmsi'
);

-- Enable compression with appropriate policies
SELECT
add_compression_policy ('locations', INTERVAL '7 days');

SELECT
add_compression_policy ('metadata', INTERVAL '7 days');

-- Create indices
CREATE INDEX idx_loca_mmsi_ts ON locations (mmsi, time DESC);

CREATE INDEX idx_meta_mmsi_ts ON metadata (mmsi, time DESC);
96 changes: 9 additions & 87 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
//! Application configuration
use std::path::{Path, PathBuf};
use std::time::Duration;

use config::{Config, ConfigError, Environment, File};
use serde::Deserialize;
use serde_with::serde_as;
use tracing::warn;

use crate::errors::AisLoggerError;

#[derive(Debug, Deserialize, Clone)]
pub struct AppConfig {
Expand All @@ -23,12 +16,9 @@ pub struct MqttConfig {
pub client_id: String,
}

#[serde_as]
#[derive(Debug, Deserialize, Clone)]
pub struct DatabaseConfig {
pub path: PathBuf,
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
pub flush_interval: Duration,
pub url: String,
}

impl AppConfig {
Expand All @@ -49,48 +39,6 @@ impl AppConfig {
}
}

impl DatabaseConfig {
/// Validate configuration parameters
pub fn validate(&self) -> Result<(), AisLoggerError> {
self.validate_path()?;
self.validate_flush_interval()?;
self.ensure_directory_exists(self.path.parent().ok_or_else(|| {
AisLoggerError::ConfigurationError {
message: "Could not get parent directory".to_string(),
}
})?)?;
Ok(())
}

fn validate_path(&self) -> Result<(), AisLoggerError> {
if self.path.to_str().unwrap_or("").is_empty() {
return Err(AisLoggerError::ConfigurationError {
message: "Database path cannot be empty".to_string(),
});
}
Ok(())
}

fn validate_flush_interval(&self) -> Result<(), AisLoggerError> {
if self.flush_interval.is_zero() {
return Err(AisLoggerError::ConfigurationError {
message: "Flush interval must be greater than zero".to_string(),
});
}
Ok(())
}

fn ensure_directory_exists(&self, dir: &Path) -> Result<(), AisLoggerError> {
if !dir.exists() {
warn!("Database directory does not exist, attempting to create it");
std::fs::create_dir_all(dir).map_err(|e| AisLoggerError::ConfigurationError {
message: format!("Could not create database directory: {}", e),
})?;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -101,44 +49,18 @@ mod tests {
env::set_var("AISLOGGER__MQTT__URI", "mqtt://localhost");
env::set_var("AISLOGGER__MQTT__TOPICS", "topic1,topic2");
env::set_var("AISLOGGER__MQTT__CLIENT_ID", "test_client");
env::set_var("AISLOGGER__DATABASE__PATH", "/tmp/test.db");
env::set_var("AISLOGGER__DATABASE__FLUSH_INTERVAL", "10");
env::set_var(
"AISLOGGER__DATABASE__URL",
"postgres://username:password@localhost/ais-recorder",
);

let config = AppConfig::load().unwrap();
assert_eq!(config.mqtt.uri, "mqtt://localhost");
assert_eq!(config.mqtt.topics, vec!["topic1", "topic2"]);
assert_eq!(config.mqtt.client_id, "test_client");
assert_eq!(config.database.path, PathBuf::from("/tmp/test.db"));
assert_eq!(config.database.flush_interval, Duration::from_secs(10));
}

#[test]
fn test_database_config_validate() {
let config = DatabaseConfig {
path: PathBuf::from("/tmp/test.db"),
flush_interval: Duration::from_secs(10),
};

assert!(config.validate().is_ok());
}

#[test]
fn test_database_config_validate_invalid_path() {
let config = DatabaseConfig {
path: PathBuf::from(""),
flush_interval: Duration::from_secs(10),
};

assert!(config.validate().is_err());
}

#[test]
fn test_database_config_validate_invalid_flush_interval() {
let config = DatabaseConfig {
path: PathBuf::from("/tmp/test.db"),
flush_interval: Duration::from_secs(0),
};

assert!(config.validate().is_err());
assert_eq!(
config.database.url,
String::from("postgres://username:password@localhost/ais-recorder")
);
}
}
Loading

0 comments on commit 727b481

Please sign in to comment.