Uploading Data

Upload data from pandas DataFrames to TitanRDM. The SDK supports full and incremental upload patterns, and handles multi-table imports in a single batch.


Note: If you are using Databricks or another Spark-based environment, consider using the SparkSync class instead of manually creating uploads. The SparkSync class provides a more convenient way to upload data by automatically discovering tables and creating uploads for them. Note: SparkSync class uses the ConventionSync class internally. You can use ConventionSync directly with pandas DataFrames if you prefer more control over the upload process.

Concepts

TermDescription
Upload (Import Batch)A container for one or more table uploads. Created once, completed after all tables are sent.
Table UploadA single table's data within an import batch. Each table upload targets a specific table definition and import mapping.
Patternfull replaces all data; incremental adds/updates rows; incremental_with_delete adds/updates/removes rows.
Import MappingDefines how your source columns map to the target table's columns.

Full Upload Workflow

import pandas as pd
from titan_rdm_sdk import TitanRDMClient

client = TitanRDMClient(
    url="https://your-tenant.titanrdm.com",
    client_id="your_client_id",
    client_secret="your_client_secret",
)

# Step 1: Create an import batch
upload = client.get_upload(
    branch_id=174,
    description="Customer data update",
    correlation_code="batch-001",
)
print(f"Import batch created: ID={upload.id}, Status={upload.status}")

# Step 2: Create a table upload
table_upload = upload.get_table_upload(
    table_definition_key=456,
    import_mapping_key=123,
    pattern="full",
)
print(f"Table upload created: ID={table_upload.id}")

# Step 3: Send data
df = pd.DataFrame({
    "customer_id": [1, 2, 3],
    "name": ["Alice", "Bob", "Charlie"],
    "email": ["alice@example.com", "bob@example.com", "charlie@example.com"],
})
table_upload.send(df)
print(f"Data uploaded. Status={table_upload.status}")

# Step 4: Complete the import batch
upload.complete(message="Upload completed successfully")
print(f"Import completed. Final status={upload.status}")

Incremental Upload

Use pattern="incremental" to add or update rows without replacing the full dataset:

upload = client.get_upload(
    branch_id=174,
    description="Incremental customer update",
    correlation_code="incremental-002",
)

table_upload = upload.get_table_upload(
    table_definition_key=456,
    import_mapping_key=123,
    pattern="incremental",
    correlation_code="incr-customers",
)

# Only send the changed rows
df_changes = df.head(5)
table_upload.send(df_changes)
upload.complete()

Incremental with Delete

Use pattern="incremental_with_delete" to also remove rows that are no longer present in the source:

table_upload = upload.get_table_upload(
    table_definition_key=456,
    import_mapping_key=123,
    pattern="incremental_with_delete",
)

Multi-Table Upload

Upload multiple tables in a single import batch:

upload = client.get_upload(
    branch_id=branch.id,
    description="Multi-table sync",
    correlation_code="multi-upload-001",
)

# Upload customers
customers_upload = upload.get_table_upload(
    table_definition_key=100,
    import_mapping_key=10,
    pattern="full",
)
customers_upload.send(customers_df)

# Upload orders
orders_upload = upload.get_table_upload(
    table_definition_key=101,
    import_mapping_key=11,
    pattern="full",
)
orders_upload.send(orders_df)

# Complete the batch (processes all tables)
upload.complete(message="Multi-table upload done")

Using Default Import Mappings

If you don't know the import mapping key, use the SDK to look it up:

# Get deployed table definition
tables = client.get_deployed_table_definitions(branch_id=5, domain_id=1)
table = tables[0]

# Get the default import mapping
import_mapping = client.get_default_import_mapping(table.id)

# Use it in your upload
table_upload = upload.get_table_upload(
    table_definition_key=table.key,
    import_mapping_key=import_mapping.key,
    pattern="full",
)

Upload Object Properties

Upload (Import Batch)

PropertyTypeDescription
idintImport batch ID
statusstrCurrent status
messagestrStatus message
descriptionstrImport description
correlation_codestrUser-defined tracking ID
insert_row_countintTotal rows inserted
update_row_countintTotal rows updated
delete_row_countintTotal rows deleted
duration_secsintProcessing duration

TableUpload

PropertyTypeDescription
idintTable upload ID
statusstrCurrent status
messagestrStatus message
correlation_codestrUser-defined tracking ID
file_sizeintUploaded file size in bytes
insert_row_countintRows inserted
update_row_countintRows updated
delete_row_countintRows deleted
duration_secsintProcessing duration

Parameters Reference

client.get_upload()

ParameterTypeRequiredDescription
branch_idintYesTarget branch ID
descriptionstrNoDescription of the import
correlation_codestrNoUser-defined tracking identifier

upload.get_table_upload()

ParameterTypeRequiredDescription
table_definition_keyintYesTarget table definition key
import_mapping_keyintYesImport mapping key
patternstrNo'full', 'incremental', or 'incremental_with_delete' (default: 'full')
correlation_codestrNoTracking identifier for this table upload

Next Steps