Spark Sync (Databricks)

The SparkSync class provides automated, convention-based synchronisation between a Spark catalog (e.g. Databricks / Unity Catalog) and TitanRDM. It extends ConventionSync to automatically read from and write to Spark catalog tables.


Naming Convention

SparkSync follows a three-level naming convention for Spark tables:

{catalog}.{schema}.{domain_abbreviation}_{database_table_name}

For example: - Download target: dev.rdmin.clin_sites - Upload source: dev.rdmout.clin_sites


Setup

from titan_rdm_sdk import TitanRDMClient
from titan_rdm_sdk.spark_sync import SparkSync

# Authenticate
client = TitanRDMClient(
    url=TITAN_URL,
    client_id=TITAN_CLIENT_ID,
    client_secret=TITAN_CLIENT_SECRET,
)

# Resolve branch
branch = client.get_branch_by_name("prod")

# Create SparkSync (automatically picks up the active SparkSession in Databricks)
sync = SparkSync(client=client, spark=spark)

In Databricks, the spark variable is available globally. In other Spark environments, pass your SparkSession explicitly.


Upload: Spark Catalog → TitanRDM

Upload an Entire Domain

Upload all deployed tables in a domain. SparkSync reads each table from {catalog}.{schema}.{abbreviation}_{database_table_name} and uploads it to TitanRDM:

results = sync.upload_sync_by_convention(
    branch_id=branch.id,
    source_catalog="dev",
    source_schema="rdmout",
    target_domain_name="Clinics",
)

for r in results:
    print(f"  {r['table']}: {r['rows']} rows — {r['status']}")

Upload Specific Tables

Upload only selected tables from the domain:

results = sync.upload_sync_by_convention(
    branch_id=branch.id,
    source_catalog="dev",
    source_schema="rdmout",
    target_domain_name="Clinics",
    target_table_names=["Site", "Delivery Centre", "Org Unit"],
)

Upload Parameters

ParameterTypeRequiredDescription
branch_idintYesTarget branch ID
source_catalogstrYesSource catalog name (e.g. 'dev')
source_schemastrYesSource schema name (e.g. 'rdmout')
target_domain_namestrYesExact domain name in TitanRDM
target_table_nameslist[str]NoFilter to specific table names
descriptionstrNoImport batch description
correlation_codestrNoTracking identifier

Download: TitanRDM → Spark Catalog

Download an Entire Domain

Download all deployed tables in a domain and write them to your Spark catalog:

results = sync.download_sync_by_convention(
    branch_id=branch.id,
    target_catalog="dev",
    target_schema="rdmin",
    source_domain_name="Clinics",
)

for r in results:
    print(f"  {r['table']}: {r['rows']} rows — {r['status']}")

Download Specific Tables

results = sync.download_sync_by_convention(
    branch_id=branch.id,
    target_catalog="dev",
    target_schema="rdmin",
    source_domain_name="Clinics",
    source_table_names=["Site", "Delivery Centre", "Org Unit"],
)

Download Parameters

ParameterTypeRequiredDescription
branch_idintYesTarget branch ID
target_catalogstrYesDestination catalog (e.g. 'dev')
target_schemastrYesDestination schema (e.g. 'rdmin')
source_domain_namestrYesExact domain name in TitanRDM
source_table_nameslist[str]NoFilter to specific table names
correlation_codestrNoTracking identifier prefix
poll_intervalfloatNoSeconds between export checks (default: 2.0)
max_waitfloatNoMax seconds to wait per export (default: 300.0)

Prerequisites

Before running SparkSync:

  1. Create schemas in your catalog: sql CREATE SCHEMA IF NOT EXISTS dev.rdmin; CREATE SCHEMA IF NOT EXISTS dev.rdmout;

  2. Populate upload source tables in rdmout with data that matches TitanRDM's database_table_name values.

  3. Install the SDK in your cluster: python %pip install titan-rdm-sdk

  4. Store credentials in a Databricks secret scope: bash databricks secrets create-scope --scope titan-rdm databricks secrets put --scope titan-rdm --key url databricks secrets put --scope titan-rdm --key client_id databricks secrets put --scope titan-rdm --key client_secret


Complete Example

from titan_rdm_sdk import TitanRDMClient
from titan_rdm_sdk.spark_sync import SparkSync

# Configuration
TITAN_URL = dbutils.secrets.get(scope="titan-rdm", key="url")
TITAN_CLIENT_ID = dbutils.secrets.get(scope="titan-rdm", key="client_id")
TITAN_CLIENT_SECRET = dbutils.secrets.get(scope="titan-rdm", key="client_secret")

CATALOG = "dev"
DOWNLOAD_SCHEMA = "rdmin"
UPLOAD_SCHEMA = "rdmout"

# Initialise
client = TitanRDMClient(url=TITAN_URL, client_id=TITAN_CLIENT_ID, client_secret=TITAN_CLIENT_SECRET)
branch = client.get_branch_by_name("prod")
sync = SparkSync(client=client, spark=spark)

# Download all Clinics tables → dev.rdmin
download_results = sync.download_sync_by_convention(
    branch_id=branch.id,
    target_catalog=CATALOG,
    target_schema=DOWNLOAD_SCHEMA,
    source_domain_name="Clinics",
)

# Upload all Clinics tables from dev.rdmout → TitanRDM
upload_results = sync.upload_sync_by_convention(
    branch_id=branch.id,
    source_catalog=CATALOG,
    source_schema=UPLOAD_SCHEMA,
    target_domain_name="Clinics",
)

SparkSync vs Manual Convention Sync

FeatureManual (ConventionSync)SparkSync
Catalog read/writeManual spark.table() / .saveAsTable()Automatic
Table filteringManual loop logicPass target_table_names / source_table_names
Batch managementManual get_upload() / complete()Handled internally
Lines of code~60 per direction~5 per direction

Widgets for Parameterised Notebooks

Use Databricks widgets to make your sync notebooks configurable:

dbutils.widgets.text("branch_name", "prod", "Branch Name")
dbutils.widgets.text("download_schema", "rdmin", "Download Schema")
dbutils.widgets.text("upload_schema", "rdmout", "Upload Schema")
dbutils.widgets.text("catalog", "hive_metastore", "Catalog")

BRANCH_NAME = dbutils.widgets.get("branch_name")
DOWNLOAD_SCHEMA = dbutils.widgets.get("download_schema")
UPLOAD_SCHEMA = dbutils.widgets.get("upload_schema")
CATALOG = dbutils.widgets.get("catalog")

Example Notebook

For a complete working example, see the SparkSync Example Notebook.


Next Steps