Skip to content

Allow user to define a subset of columns for update detection in UPSERT #3598

Description

@AndrejIring

Feature Request / Improvement

Currently, when detecting which rows should be updated in the upsert, all non-primary key columns are iterated over, converted to a Python type, and compared. This can be time-consuming and memory-consuming (e.g., with complex columns containing JSON data in struct, list, ...).

If the user knows that a change has occurred to specific columns, it would be a huge performance improvement to just iterate over these columns. Or if the user has a column that implies that any change has occurred (e.g., a hash of the data)

For example:
Hash column: if the user has the possibility to create a column containing a hash for each row, then upsert has to look only at this column when detecting changes. This way, pyiceberg doesn't have to convert all columns to Python type and compare them.

Proposition:

Update the function get_rows_to_update to also accept an optional parameter difference_cols. Update the upsert methods with this parameter and pass it to the get_rows_to_update.

In case there is no intersection between non-primary key columns and difference_cols, pyiceberg can either raise and error or it can fall-back to the default behaviour (iterating over all non-PK columns).

Usage:

from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringType

import pyarrow as pa

schema = Schema(
    NestedField(1, "city", StringType(), required=True),
    NestedField(2, "inhabitants", IntegerType(), required=True),
    # Mark City as the identifier field, also known as the primary-key
    identifier_field_ids=[1]
)

tbl = catalog.create_table("default.cities", schema=schema)

arrow_schema = pa.schema(
    [
        pa.field("city", pa.string(), nullable=False),
        pa.field("inhabitants", pa.int32(), nullable=False),
    ]
)

# Write some data
df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "inhabitants": 921402},
        {"city": "San Francisco", "inhabitants": 808988},
        {"city": "Drachten", "inhabitants": 45019},
        {"city": "Paris", "inhabitants": 2103000},
    ],
    schema=arrow_schema
)
tbl.append(df)



df = pa.Table.from_pylist(
    [
        # Will be updated, the inhabitants has been updated
        {"city": "Drachten", "inhabitants": 45505},

        # New row, will be inserted
        {"city": "Berlin", "inhabitants": 3432000},

        # Ignored, already exists in the table
        {"city": "Paris", "inhabitants": 2103000},
    ],
    schema=arrow_schema
)
upd = tbl.upsert(df, difference_cols=["inhabitants"])

I have already prepared how it can look in my fork 59d1833.
If the proposition is accepted, I can prepare the PR.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions