Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
perf: New neo4j csv publisher to improve performance using batched pa…
Browse files Browse the repository at this point in the history
…rams (amundsen-io#1957)

* New publisher using managed transactions, unwind statements, configurable two or one way relations

Signed-off-by: Kristen Armes <[email protected]>

* Add logic for preserving adhoc ui data, and move write transactions logic to reusable function

Signed-off-by: Kristen Armes <[email protected]>

* Fixing flake8 error (not sure how it got through, not from my change)

Signed-off-by: Kristen Armes <[email protected]>

* Pulling in a few of the latest changes from the original publisher and lint

Signed-off-by: Kristen Armes <[email protected]>

* Fix tests

Signed-off-by: Kristen Armes <[email protected]>

* Addressing PR feedback and change index creation to use a managed transaction

Signed-off-by: Kristen Armes <[email protected]>

* Refactor props body function and separate out constants

Signed-off-by: Kristen Armes <[email protected]>

* Addressing PR feedback plus refactoring

Signed-off-by: Kristen Armes <[email protected]>

* Addressing PR feedback, more refactoring

Signed-off-by: Kristen Armes <[email protected]>

* Minor fixes and bump version

Signed-off-by: Kristen Armes <[email protected]>

Signed-off-by: Kristen Armes <[email protected]>
  • Loading branch information
kristenarmes authored Aug 16, 2022
1 parent 01d6e8c commit 09f2d92
Show file tree
Hide file tree
Showing 7 changed files with 623 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ class HiveTableLastUpdatedExtractor(Extractor):
AND NOT EXISTS (SELECT * FROM PARTITION_KEYS WHERE PARTITION_KEYS.TBL_ID = TBLS.TBL_ID)
"""

DEFAULT_POSTGRES_ADDTIONAL_WHERE_CLAUSE = """ NOT EXISTS (SELECT * FROM "PARTITIONS" p WHERE p."TBL_ID" = t."TBL_ID")
AND NOT EXISTS (SELECT * FROM "PARTITION_KEYS" pk WHERE pk."TBL_ID" = t."TBL_ID")
DEFAULT_POSTGRES_ADDTIONAL_WHERE_CLAUSE = """ NOT EXISTS (SELECT * FROM "PARTITIONS" p
WHERE p."TBL_ID" = t."TBL_ID") AND NOT EXISTS (SELECT * FROM "PARTITION_KEYS" pk WHERE pk."TBL_ID" = t."TBL_ID")
"""

DATABASE = 'hive'
Expand Down
35 changes: 19 additions & 16 deletions databuilder/databuilder/publisher/neo4j_csv_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

from databuilder.publisher.base_publisher import Publisher
from databuilder.publisher.neo4j_preprocessor import NoopRelationPreprocessor
from databuilder.publisher.publisher_config_constants import (
Neo4jCsvPublisherConfigs, PublishBehaviorConfigs, PublisherConfigs,
)

# Setting field_size_limit to solve the error below
# _csv.Error: field larger than field limit (131072)
Expand All @@ -32,53 +35,53 @@

# Config keys
# A directory that contains CSV files for nodes
NODE_FILES_DIR = 'node_files_directory'
NODE_FILES_DIR = PublisherConfigs.NODE_FILES_DIR
# A directory that contains CSV files for relationships
RELATION_FILES_DIR = 'relation_files_directory'
RELATION_FILES_DIR = PublisherConfigs.RELATION_FILES_DIR
# A end point for Neo4j e.g: bolt://localhost:9999
NEO4J_END_POINT_KEY = 'neo4j_endpoint'
NEO4J_END_POINT_KEY = Neo4jCsvPublisherConfigs.NEO4J_END_POINT_KEY
# A transaction size that determines how often it commits.
NEO4J_TRANSACTION_SIZE = 'neo4j_transaction_size'
NEO4J_TRANSACTION_SIZE = Neo4jCsvPublisherConfigs.NEO4J_TRANSACTION_SIZE
# A progress report frequency that determines how often it report the progress.
NEO4J_PROGRESS_REPORT_FREQUENCY = 'neo4j_progress_report_frequency'
# A boolean flag to make it fail if relationship is not created
NEO4J_RELATIONSHIP_CREATION_CONFIRM = 'neo4j_relationship_creation_confirm'

NEO4J_MAX_CONN_LIFE_TIME_SEC = 'neo4j_max_conn_life_time_sec'
NEO4J_MAX_CONN_LIFE_TIME_SEC = Neo4jCsvPublisherConfigs.NEO4J_MAX_CONN_LIFE_TIME_SEC

# list of nodes that are create only, and not updated if match exists
NEO4J_CREATE_ONLY_NODES = 'neo4j_create_only_nodes'
NEO4J_CREATE_ONLY_NODES = Neo4jCsvPublisherConfigs.NEO4J_CREATE_ONLY_NODES

# list of node labels that could attempt to be accessed simultaneously
NEO4J_DEADLOCK_NODE_LABELS = 'neo4j_deadlock_node_labels'

NEO4J_USER = 'neo4j_user'
NEO4J_PASSWORD = 'neo4j_password'
NEO4J_USER = Neo4jCsvPublisherConfigs.NEO4J_USER
NEO4J_PASSWORD = Neo4jCsvPublisherConfigs.NEO4J_PASSWORD
# in Neo4j (v4.0+), we can create and use more than one active database at the same time
NEO4J_DATABASE_NAME = 'neo4j_database'
NEO4J_DATABASE_NAME = Neo4jCsvPublisherConfigs.NEO4J_DATABASE_NAME

# NEO4J_ENCRYPTED is a boolean indicating whether to use SSL/TLS when connecting
NEO4J_ENCRYPTED = 'neo4j_encrypted'
NEO4J_ENCRYPTED = Neo4jCsvPublisherConfigs.NEO4J_ENCRYPTED
# NEO4J_VALIDATE_SSL is a boolean indicating whether to validate the server's SSL/TLS
# cert against system CAs
NEO4J_VALIDATE_SSL = 'neo4j_validate_ssl'
NEO4J_VALIDATE_SSL = Neo4jCsvPublisherConfigs.NEO4J_VALIDATE_SSL

# This will be used to provide unique tag to the node and relationship
JOB_PUBLISH_TAG = 'job_publish_tag'
JOB_PUBLISH_TAG = PublisherConfigs.JOB_PUBLISH_TAG

# any additional fields that should be added to nodes and rels through config
ADDITIONAL_FIELDS = 'additional_fields'
ADDITIONAL_FIELDS = PublisherConfigs.ADDITIONAL_PUBLISHER_METADATA_FIELDS

# Neo4j property name for published tag
PUBLISHED_TAG_PROPERTY_NAME = 'published_tag'
PUBLISHED_TAG_PROPERTY_NAME = PublisherConfigs.PUBLISHED_TAG_PROPERTY_NAME

# Neo4j property name for last updated timestamp
LAST_UPDATED_EPOCH_MS = 'publisher_last_updated_epoch_ms'
LAST_UPDATED_EPOCH_MS = PublisherConfigs.LAST_UPDATED_EPOCH_MS

# A boolean flag to indicate if publisher_metadata (e.g. published_tag,
# publisher_last_updated_epoch_ms)
# will be included as properties of the Neo4j nodes
ADD_PUBLISHER_METADATA = 'add_publisher_metadata'
ADD_PUBLISHER_METADATA = PublishBehaviorConfigs.ADD_PUBLISHER_METADATA

RELATION_PREPROCESSOR = 'relation_preprocessor'

Expand Down
Loading

0 comments on commit 09f2d92

Please sign in to comment.