Skip to content

API

cleaning

cleaning_functions

clean_address(raw)

Given a raw address, first conduct baseline address cleaning, then identify and return the components of the address. Where possible, infer correct city and state value from the zip code given.

Parameters:

Name Type Description Default
raw str

A raw address.

required

Returns:

Name Type Description
dict dict

A dictionary of address components.

Source code in src/chainlink/cleaning/cleaning_functions.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def clean_address(raw: str) -> dict:
    """
    Given a raw address, first conduct baseline address cleaning, then
    identify and return the components of the address. Where possible, infer
    correct city and state value from the zip code given.

    Args:
        raw (str): A raw address.

    Returns:
        dict: A dictionary of address components.
    """
    if not isinstance(raw, str) or raw == "":
        return {
            "raw": raw,
            "address_number": None,
            "street_pre_directional": None,
            "street_name": None,
            "street_post_type": None,
            "unit_type": None,
            "unit_number": None,
            "subaddress_type": None,
            "subaddress_identifier": None,
            "city": None,
            "state": None,
            "postal_code": None,
            "street": None,
            "address_norm": None,
        }

    FIELD_NAMES = [
        "AddressNumber",
        "StreetNamePreDirectional",
        "StreetName",
        "StreetNamePostType",
        "OccupancyType",
        "OccupancyIdentifier",
        "SubaddressType",
        "SubaddressIdentifier",
        "PlaceName",
        "StateName",
        "ZipCode",
    ]

    # remove spaces and punct
    raw_stripped = re.sub(r",|\.", "", raw).strip()
    # replace # with UNIT
    to_normalize = re.sub(r"#", " UNIT ", raw_stripped)
    # replace multiple spaces with single space
    to_normalize = re.sub(r"\s+", " ", to_normalize)

    try:
        normalized = normalize_address_record(to_normalize)
        normalized = " ".join(value for value in normalized.values() if value is not None)

    except Exception:
        normalized = to_normalize

    try:
        tags = usaddress.tag(normalized)
        tags = dict(tags[0])

    # retain any successfully parsed fields
    except usaddress.RepeatedLabelError as e:
        tags = {}

        for parsed_field in e.parsed_string:
            value, label = parsed_field

            if label in FIELD_NAMES:
                tags[label] = value

    record = {
        "raw": raw,
        "address_number": tags.get("AddressNumber"),
        "street_pre_directional": tags.get("StreetNamePreDirectional"),
        "street_name": tags.get("StreetName"),
        "street_post_type": tags.get("StreetNamePostType"),
        "unit_type": tags.get("OccupancyType"),
        "unit_number": tags.get("OccupancyIdentifier"),
        "subaddress_type": tags.get("SubaddressType"),
        "subaddress_identifier": tags.get("SubaddressIdentifier"),
        "city": tags.get("PlaceName"),
        "state": tags.get("StateName"),
        "postal_code": tags.get("ZipCode"),
        "address_norm": str(re.sub(r"[^a-zA-Z0-9]+", "", raw).upper()),
    }

    if record["city"] is not None:
        record["city"] = re.sub(r"[^A-z\s]", "", record["city"]).strip()

        if record["city"] == "":
            record["city"] = None

    if record["street_name"] is not None:
        record["street_name"] = re.sub(r",\.", "", record["street_name"]).strip()
        # Remove unit from street name for cases where the address parser
        # erroneously included it
        record["street_name"] = re.sub(r"UNIT.*", "", record["street_name"]).strip()
        if record["street_name"] == "":
            record["street_name"] = None

    if record["unit_number"] is not None:
        record["unit_number"] = re.sub(r"[^[A-z0-9]", "", record["unit_number"])

        if record["unit_number"] == "":
            record["unit_number"] = None

    # Overwrite city and state using uszip if the parsed state is not valid
    if record["state"] not in state_abbr or record["state"] is None:
        zip_city, zip_state = identify_state_city(record["postal_code"])

        # if don't find valid city, state, leave original
        if zip_city is not None:
            record["city"] = zip_city

        if zip_state is not None:
            record["state"] = zip_state

    street_fields = [
        "address_number",
        "street_pre_directional",
        "street_name",
        "street_post_type",
    ]
    record["street"] = " ".join([
        record[field] for field in street_fields if (record[field] is not None) and (record[field] != "")
    ])
    if (record["street"] == "") or (record["street"] == " "):
        record["street"] = None

    if suffixes.get(record["street_post_type"]):
        record["street_post_type"] = suffixes.get(record["street_post_type"])

    for key, value in record.items():
        record[key] = None if value == "" else value

    for k, v in record.items():
        if v is None:
            continue
        # Force everything to a Python string:
        if not isinstance(v, str):
            record[k] = str(v)

    return record

clean_names(raw)

Given a raw name string, clean the name and return it. Contains conditional logic based on the source of the data to handle data-specific cleaning cases. Returns none if the name resembles a list of excluded strings. Strips most non-alphanumeric characters.

Parameters:

Name Type Description Default
raw str

A raw name string.

required

Returns:

Name Type Description
str str | None

A cleaned name string.

Source code in src/chainlink/cleaning/cleaning_functions.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def clean_names(raw: str) -> str | None:
    """
    Given a raw name string, clean the name and return it. Contains conditional
    logic based on the source of the data to handle data-specific cleaning cases.
    Returns none if the name resembles a list of excluded strings. Strips
    most non-alphanumeric characters.

    Args:
        raw (str): A raw name string.

    Returns:
        str: A cleaned name string.
    """

    if re.search(EXCLUDED_PATTERNS, raw):
        return None

    name = raw.upper()

    name = name.replace("&", "AND").replace("-", " ").replace("@", "AT").replace("—", " ")

    name = re.sub(r"[^a-zA-Z0-9\s]", "", name)
    name = re.sub(r"\s{2,}", " ", name)
    if (name == "") or (name == " "):
        return None
    else:
        return name
    return name

clean_zipcode(raw)

Modified from the function written by Anthony Moser of the deseguys project.

Returns a 5-digit zipcode from a string.

Parameters:

Name Type Description Default
raw any

A zipcode.

required

Returns:

Name Type Description
str str

A 5-digit zipcode or an empty string.

Source code in src/chainlink/cleaning/cleaning_functions.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def clean_zipcode(raw: str | int) -> str:
    """
    Modified from the function written by Anthony Moser of the deseguys project.

    Returns a 5-digit zipcode from a string.

    Args:
        raw (any): A zipcode.

    Returns:
        str: A 5-digit zipcode or an empty string.
    """
    try:
        zipcode = str(raw)[:5]
    except Exception:
        return ""
    else:
        return zipcode

identify_state_city(zipcode)

Use zipcode to look up state and city info using the uszipcode API.

Parameters:

Name Type Description Default
zipcode str

A zipcode.

required

Returns:

Name Type Description
tuple tuple

A tuple of city and state, or (None, None) if the lookup failed.

Source code in src/chainlink/cleaning/cleaning_functions.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def identify_state_city(zipcode: str) -> tuple:
    """
    Use zipcode to look up state and city info using the uszipcode API.

    Args:
        zipcode (str): A zipcode.

    Returns:
        tuple: A tuple of city and state, or (None, None) if the lookup failed.
    """
    zipcode = clean_zipcode(zipcode)
    try:
        if zipcode in zip_cache:
            zip_city = zip_cache[zipcode]["city"]
            zip_state = zip_cache[zipcode]["state"]

            return (zip_city, zip_state)

        else:
            engine = SearchEngine()
            zipcode_obj = engine.by_zipcode(int(zipcode))
            # zip_cache[zipcode] = zipcode

            zip_city = zipcode_obj.major_city.upper()
            zip_state = zipcode_obj.state

            zip_citystate = {"city": zip_city, "state": zip_state}
            zip_cache[zipcode] = zip_citystate

            return (zip_city, zip_state)

    # Handle cases where zip code is null or not a number
    except AttributeError:
        return (None, None)

    except TypeError:
        return (None, None)

    except ValueError:
        return (None, None)

predict_org(name)

Given a string, predict whether or not the string is an organization name.

Parameters:

Name Type Description Default
name str

An entity name.

required

Returns:

Name Type Description
int int

1 if the name is an organization, 0 if the name is an individual.

Source code in src/chainlink/cleaning/cleaning_functions.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def predict_org(name: str) -> int:
    """
    Given a string, predict whether or not the string is an organization name.

    Args:
        name (str): An entity name.

    Returns:
        int: 1 if the name is an organization, 0 if the name is an individual.
    """
    individual_names = re.compile(
        r"CURRENT OWNER|TAX PAYER OF|OWNER OF RECORD|PROPERTY OWNER",
        flags=re.IGNORECASE,
    )

    if (
        re.search("0-1", name)
        or re.search(ABB_PATTERNS, name)
        or re.search(WORD_PATTERNS, name)
        or re.search(EOL_PATTERNS, name)
    ):
        return 1

    # Doing this because GX PROPERTY OWNER LLC exists
    if re.search(individual_names, name):
        return 0

    else:
        return 0

remove_initial_I(raw)

Remove the "I" or I" present for some names in corporation and LLC data where name was incorrectly entered in the style of "I, John Smith" instead of just "John Smith"

Source code in src/chainlink/cleaning/cleaning_functions.py
321
322
323
324
325
326
327
328
329
330
331
def remove_initial_I(raw: str) -> str:
    """
    Remove the "I" or I" present for some names in corporation
    and LLC data where name was incorrectly entered in the
    style of "I, John Smith" instead of just "John Smith"
    """
    if raw[:3] == '"I"':
        raw = raw[3:]
    if raw[:2] == 'I"':
        raw = raw[2:]
    return raw

For each entity in the existing_db list, create links between the new entity and the existing entity.

for old_entity in existing_db

-find all the name links old_entity.name to new_entity.name, etc. -find all the address links old_entity.address to new_entity.address, etc. -match by raw address string -match by clean street string -if street id matches, match by unit -match street name and number if zipcode matches

Returns: None

Source code in src/chainlink/link/link_generic.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def create_across_links(db_path: str | Path, new_schema: dict, existing_schema: dict, link_exclusions: list) -> None:
    """
    For each entity in the existing_db list, create links between the new entity
    and the existing entity.

    for old_entity in existing_db:
        -find all the name links old_entity.name to new_entity.name, etc.
        -find all the address links old_entity.address to new_entity.address, etc.
            -match by raw address string
            -match by clean street string
            -if street id matches, match by unit
            -match street name and number if zipcode matches

    Returns: None
    """

    new_entity = new_schema["schema_name"]

    new_entity_names = []
    new_entity_addresses = []

    # gather all the name and address columns for the new entity
    for table in new_schema["tables"]:
        for name_col in table["name_cols"]:
            new_entity_names.append((table["table_name"], table["id_col"], name_col))
        for address_col in table["address_cols"]:
            new_entity_addresses.append((table["table_name"], table["id_col"], address_col))

    # create name and address matches for each existing entity and new entity
    existing_entity = existing_schema["schema_name"]

    existing_entity_names = []
    existing_entity_addresses = []

    # gather all the name and address columns for this existing entity
    for table in existing_schema["tables"]:
        for name_col in table["name_cols"]:
            existing_entity_names.append((
                table["table_name"],
                table["id_col"],
                name_col,
            ))
        for address_col in table["address_cols"]:
            existing_entity_addresses.append((
                table["table_name"],
                table["id_col"],
                address_col,
            ))
    # generate name match combos
    name_combos = list(itertools.product(new_entity_names, existing_entity_names))

    # need to add in across table within entity combos

    for new, old in name_combos:
        left_table, left_ent_id, left_name = new
        right_table, right_ent_id, right_name = old

        execute_match(
            db_path=db_path,
            left_entity=new_entity,
            left_table=left_table,
            left_matching_col=left_name,
            left_ent_id=left_ent_id,
            match_type="name_match",
            left_matching_id=f"{left_name}_name_id",
            right_entity=existing_entity,
            right_table=right_table,
            right_matching_col=right_name,
            right_ent_id=right_ent_id,
            right_matching_id=f"{right_name}_name_id",
            link_exclusions=link_exclusions,
        )

    # generate address match combos
    address_combos = list(itertools.product(new_entity_addresses, existing_entity_addresses))

    for new, old in address_combos:
        left_table, left_ent_id, left_address = new
        right_table, right_ent_id, right_address = old

        execute_match_address(
            db_path=db_path,
            left_entity=new_entity,
            left_table=left_table,
            left_address=left_address,
            left_ent_id=left_ent_id,
            right_entity=existing_entity,
            right_table=right_table,
            right_address=right_address,
            right_ent_id=right_ent_id,
            skip_address=True,
            link_exclusions=link_exclusions,
        )

create all fuzzy links across new entity and existing entity

Returns: None

Source code in src/chainlink/link/link_generic.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def create_tfidf_across_links(
    db_path: str | Path, new_schema: dict, existing_schema: dict, link_exclusions: list
) -> None:
    """
    create all fuzzy links across new entity and existing entity

    Returns: None
    """
    new_entity = new_schema["schema_name"]

    # gather all the name columns for the new entity
    new_entity_names = []
    new_entity_addresses = []

    for table in new_schema["tables"]:
        for name_col in table["name_cols"]:
            new_entity_names.append((table["table_name"], table["id_col"], name_col))
        for address_col in table["address_cols"]:
            new_entity_addresses.append((table["table_name"], table["id_col"], address_col))

    # go through all the existing entities / schemas

    existing_entity = existing_schema["schema_name"]

    existing_entity_names = []
    existing_entity_addresses = []

    # gather all the name and address columns for this existing entity
    for table in existing_schema["tables"]:
        for name_col in table["name_cols"]:
            existing_entity_names.append((
                table["table_name"],
                table["id_col"],
                name_col,
            ))
        for address_col in table["address_cols"]:
            existing_entity_addresses.append((
                table["table_name"],
                table["id_col"],
                address_col,
            ))
    # generate name match combos

    name_combos = list(itertools.product(new_entity_names, existing_entity_names))

    for new, old in name_combos:
        left_table, left_ent_id, left_name = new
        right_table, right_ent_id, right_name = old

        execute_fuzzy_link(
            db_path=db_path,
            left_entity=new_entity,
            left_table=left_table,
            left_ent_id=left_ent_id,
            left_name_col=left_name,
            right_entity=existing_entity,
            right_table=right_table,
            right_ent_id=right_ent_id,
            right_name_col=right_name,
            tfidf_table="entity.name_similarity",
            link_exclusions=link_exclusions,
        )

    # generate address match combos
    address_combos = list(itertools.product(new_entity_addresses, existing_entity_addresses))
    for new, old in address_combos:
        left_table, left_ent_id, left_address = new
        right_table, right_ent_id, right_address = old

        execute_address_fuzzy_link(
            db_path=db_path,
            left_entity=new_entity,
            left_table=left_table,
            left_ent_id=left_ent_id,
            left_address_col=left_address,
            right_entity=existing_entity,
            right_table=right_table,
            right_ent_id=right_ent_id,
            right_address_col=right_address,
            tfidf_table="entity.street_name_similarity",
            skip_address=True,
            link_exclusions=link_exclusions,
        )

    return None

create tfidf links within entity

Returns: None

Source code in src/chainlink/link/link_generic.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def create_tfidf_within_links(db_path: str | Path, schema_config: dict, link_exclusions: list) -> None:
    """
    create tfidf links within entity

    Returns: None
    """

    new_entity = schema_config["schema_name"]
    within_entity_across_tables_names = []
    within_entity_across_tables_addresses = []
    # create fuzzy links
    # generate combos, need all within tables

    for table in schema_config["tables"]:
        # generate name matches combos
        name_combos = list(itertools.product(table["name_cols"], repeat=2))

        for left_name, right_name in name_combos:
            execute_fuzzy_link(
                db_path=db_path,
                left_entity=new_entity,
                left_table=table["table_name"],
                left_ent_id=table["id_col"],
                left_name_col=left_name,
                right_entity=new_entity,
                right_table=table["table_name"],
                right_ent_id=table["id_col"],
                right_name_col=right_name,
                tfidf_table="entity.name_similarity",
                link_exclusions=link_exclusions,
            )

        address_combos = list(itertools.product(table["address_cols"], repeat=2))
        for left_address, right_address in address_combos:
            execute_address_fuzzy_link(
                db_path=db_path,
                left_entity=new_entity,
                left_table=table["table_name"],
                left_ent_id=table["id_col"],
                left_address_col=left_address,
                right_entity=new_entity,
                right_table=table["table_name"],
                right_ent_id=table["id_col"],
                right_address_col=right_address,
                tfidf_table="entity.street_name_similarity",
                skip_address=True,
                link_exclusions=link_exclusions,
            )

        # for across tables within entity
        within_entity_across_tables_names.append([
            (name, table["table_name"], table["id_col"]) for name in table["name_cols"]
        ])
        within_entity_across_tables_addresses.append([
            (address, table["table_name"], table["id_col"]) for address in table["address_cols"]
        ])

    # generate combos, across tables within entity
    across_name_combos, across_address_combos = generate_combos_within_across_tables(
        within_entity_across_tables_names, within_entity_across_tables_addresses
    )

    for left, right in across_name_combos:
        left_name, left_table, left_ent_id = left
        right_name, right_table, right_ent_id = right

        execute_fuzzy_link(
            db_path=db_path,
            left_entity=new_entity,
            left_table=left_table,
            left_ent_id=left_ent_id,
            left_name_col=left_name,
            right_entity=new_entity,
            right_table=right_table,
            right_ent_id=right_ent_id,
            right_name_col=right_name,
            tfidf_table="entity.name_similarity",
            link_exclusions=link_exclusions,
        )

    for left, right in across_address_combos:
        left_address, left_table, left_ent_id = left
        right_address, right_table, right_ent_id = right
        execute_address_fuzzy_link(
            db_path=db_path,
            left_entity=new_entity,
            left_table=left_table,
            left_ent_id=left_ent_id,
            left_address_col=left_address,
            right_entity=new_entity,
            right_table=right_table,
            right_ent_id=right_ent_id,
            right_address_col=right_address,
            tfidf_table="entity.street_name_similarity",
            skip_address=True,
            link_exclusions=link_exclusions,
        )

Creates exact string matches on name and address fields for entity and entity.

Returns: None

Source code in src/chainlink/link/link_generic.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def create_within_links(db_path: str | Path, schema_config: dict, link_exclusions: list) -> None:
    """
    Creates exact string matches on name and address fields for entity and
    entity.

    For each file find the links with the file:
        -find all the name links includes name1 to name1, name1 to name2, etc.
        -find all the address links includes address1 to address1, address1 to address2, etc.
            -match by raw address string
            -match by clean street string
            -if street id matches, match by unit
            -match street name and number if zipcode matches
        -find all name and address links across tables within the entity

    Returns: None
    """

    entity = schema_config["schema_name"]

    within_entity_across_tables_names = []
    within_entity_across_tables_addresses = []

    # within each table
    for table_config in schema_config["tables"]:
        table = table_config["table_name"]

        if table_config.get("name_cols"):
            # generate name matches combos
            name_combos = list(itertools.product(table_config["name_cols"], repeat=2))

            for left_name, right_name in name_combos:
                execute_match(
                    db_path=db_path,
                    match_type="name_match",
                    left_entity=entity,
                    left_table=table,
                    left_matching_col=left_name,
                    left_matching_id=f"{left_name}_name_id",
                    left_ent_id=table_config["id_col"],
                    right_entity=entity,
                    right_table=table,
                    right_matching_col=right_name,
                    right_ent_id=table_config["id_col"],
                    right_matching_id=f"{right_name}_name_id",
                    link_exclusions=link_exclusions,
                )

        if table_config.get("address_cols"):
            # address within
            address_combos = list(itertools.product(table_config["address_cols"], repeat=2))

            for left_address, right_address in address_combos:
                execute_match_address(
                    db_path=db_path,
                    left_entity=entity,
                    left_table=table,
                    left_address=left_address,
                    left_ent_id=table_config["id_col"],
                    right_entity=entity,
                    right_table=table,
                    right_address=right_address,
                    right_ent_id=table_config["id_col"],
                    skip_address=True,
                    link_exclusions=link_exclusions,
                )

        # for across tables
        if table_config.get("name_cols"):
            within_entity_across_tables_names.append([
                (name, table, table_config["id_col"]) for name in table_config["name_cols"]
            ])
        if table_config.get("address_cols"):
            within_entity_across_tables_addresses.append([
                (address, table, table_config["id_col"]) for address in table_config["address_cols"]
            ])

    # generate combos across tables
    if within_entity_across_tables_names or within_entity_across_tables_addresses:
        across_name_combos, across_address_combos = generate_combos_within_across_tables(
            within_entity_across_tables_names, within_entity_across_tables_addresses
        )

    # across files for name
    for left, right in across_name_combos:
        left_name, left_table, left_ent_id = left
        right_name, right_table, right_ent_id = right

        execute_match(
            db_path=db_path,
            match_type="name_match",
            left_entity=entity,
            left_table=left_table,
            left_matching_col=left_name,
            left_ent_id=left_ent_id,
            left_matching_id=f"{left_name}_name_id",
            right_entity=entity,
            right_table=right_table,
            right_matching_col=right_name,
            right_ent_id=right_ent_id,
            right_matching_id=f"{right_name}_name_id",
            link_exclusions=link_exclusions,
        )

    # across files for address
    for left, right in across_address_combos:
        left_address, left_table, left_ent_id = left
        right_address, right_table, right_ent_id = right

        execute_match_address(
            db_path=db_path,
            left_entity=entity,
            left_table=left_table,
            left_address=left_address,
            left_ent_id=left_ent_id,
            right_entity=entity,
            right_table=right_table,
            right_address=right_address,
            right_ent_id=right_ent_id,
            skip_address=True,
            link_exclusions=link_exclusions,
        )

fuzzy address matching

Source code in src/chainlink/link/link_utils.py
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
def execute_address_fuzzy_link(
    db_path: str | Path,
    left_entity: str,
    left_table: str,
    left_ent_id: str,
    left_address_col: str,
    right_entity: str,
    right_table: str,
    right_ent_id: str,
    right_address_col: str,
    tfidf_table: str = "link.tfidf_staging",
    skip_address: bool = False,
    link_exclusions: Optional[list] = None,
) -> None:
    """
    fuzzy address matching
    """
    if link_exclusions is None:
        link_exclusions = []

    link_table = f"link.{left_entity}_{right_entity}"

    # align the names of the match columns
    left_side = f"{left_entity}_{left_table}_{left_address_col}"
    right_side = f"{right_entity}_{right_table}_{right_address_col}"
    if left_entity != right_entity:
        match_name_stem = f"{left_side}_{right_side}" if left_side < right_side else f"{right_side}_{left_side}"
    else:
        match_name_stem = f"{left_side}_{right_side}"

    same_condition = "TRUE"

    if left_ent_id == right_ent_id and left_entity == right_entity:
        left_ent_id_rename = f"{left_ent_id}_1"
        right_ent_id_rename = f"{right_ent_id}_2"
        left_unit_num_rename = f"{left_address_col}_unit_number_1"
        right_unit_num_rename = f"{right_address_col}_unit_number_2"
        left_address_num_rename = f"{left_address_col}_address_number_1"
        right_address_num_rename = f"{right_address_col}_address_number_2"
        left_postal_code_rename = f"{left_address_col}_postal_code_1"
        right_postal_code_rename = f"{right_address_col}_postal_code_2"
        left_directional_rename = f"{left_address_col}_street_pre_directional_1"
        right_directional_rename = f"{right_address_col}_street_pre_directional_2"
        # if same id, want to remove dupes
        same_condition = f"{left_entity}_{left_ent_id_rename} < {right_entity}_{right_ent_id_rename}"
    else:
        left_ent_id_rename = left_ent_id
        right_ent_id_rename = right_ent_id
        left_unit_num_rename = f"{left_address_col}_unit_number"
        right_unit_num_rename = f"{right_address_col}_unit_number"
        left_address_num_rename = f"{left_address_col}_address_number"
        right_address_num_rename = f"{right_address_col}_address_number"
        left_postal_code_rename = f"{left_address_col}_postal_code"
        right_postal_code_rename = f"{right_address_col}_postal_code"
        left_directional_rename = f"{left_address_col}_street_pre_directional"
        right_directional_rename = f"{right_address_col}_street_pre_directional"

    if skip_address:
        address_condition = " != 1"
        left_address_condition = f"{left_address_col}_skip {address_condition}"
        right_address_condition = f"{right_address_col}_skip {address_condition}"
    else:
        left_address_condition = "TRUE"
        right_address_condition = "TRUE"

    match_names = [
        f"{match_name_stem}_street_fuzzy_match",
        f"{match_name_stem}_unit_fuzzy_match",
    ]

    conditions = [
        f"""
        {left_entity}_{left_address_num_rename} = {right_entity}_{right_address_num_rename} AND
        {left_entity}_{left_postal_code_rename} = {right_entity}_{right_postal_code_rename}""",
        f"""
        {left_entity}_{left_address_num_rename} = {right_entity}_{right_address_num_rename} AND
        {left_entity}_{left_postal_code_rename} = {right_entity}_{right_postal_code_rename} AND
        CAST({left_entity}_{left_unit_num_rename} AS VARCHAR) = CAST({right_entity}_{right_unit_num_rename} AS VARCHAR)
        """,
    ]
    for match_name, condition in zip(match_names, conditions):
        # check link exclusion
        if any(exclusion in match_name for exclusion in link_exclusions):
            return None
        query = f"""
        CREATE OR REPLACE TABLE {link_table} AS

        WITH tfidf_matches AS (
            SELECT id_a,
                id_b,
                similarity as {match_name}
            FROM {tfidf_table}
        ),

        left_source AS (
            SELECT {left_ent_id} as {left_entity}_{left_ent_id_rename},
                    {left_address_col}_street_name_id,
                    {left_address_col}_unit_number as {left_entity}_{left_unit_num_rename},
                    {left_address_col}_street_pre_directional as {left_entity}_{left_directional_rename},
                    {left_address_col}_address_number as {left_entity}_{left_address_num_rename},
                    {left_address_col}_postal_code as {left_entity}_{left_postal_code_rename}
            FROM {left_entity}.{left_table}
            WHERE {left_address_condition}
        ),

        right_source AS (
            SELECT {right_ent_id} as {right_entity}_{right_ent_id_rename},
                    {right_address_col}_street_name_id,
                    {right_address_col}_unit_number as {right_entity}_{right_unit_num_rename},
                    {right_address_col}_street_pre_directional as {right_entity}_{right_directional_rename},
                    {right_address_col}_address_number as {right_entity}_{right_address_num_rename},
                    {right_address_col}_postal_code as {right_entity}_{right_postal_code_rename}
            FROM {right_entity}.{right_table}
            WHERE {right_address_condition}
        ),

        fuzzy_match_1 AS (
            SELECT l.{left_entity}_{left_ent_id_rename},
                l.{left_entity}_{left_unit_num_rename}, l.{left_entity}_{left_address_num_rename},
                l.{left_entity}_{left_postal_code_rename}, l.{left_entity}_{left_directional_rename},
                r.{right_entity}_{right_ent_id_rename},
                r.{right_entity}_{right_unit_num_rename}, r.{right_entity}_{right_address_num_rename},
                r.{right_entity}_{right_postal_code_rename}, r.{right_entity}_{right_directional_rename},
                m.{match_name}
            FROM   tfidf_matches as m
            INNER JOIN left_source as l
                ON m.id_a = l.{left_address_col}_street_name_id
            INNER JOIN right_source as r
                ON m.id_b = r.{right_address_col}_street_name_id
        ),

        fuzzy_match_2 AS (
            SELECT l.{left_entity}_{left_ent_id_rename},
                l.{left_entity}_{left_unit_num_rename}, l.{left_entity}_{left_address_num_rename},
                l.{left_entity}_{left_postal_code_rename}, l.{left_entity}_{left_directional_rename},
                r.{right_entity}_{right_ent_id_rename},
                r.{right_entity}_{right_unit_num_rename}, r.{right_entity}_{right_address_num_rename},
                r.{right_entity}_{right_postal_code_rename}, r.{right_entity}_{right_directional_rename},
                m.{match_name}
            FROM   tfidf_matches as m
            INNER JOIN left_source as l
                ON m.id_b = l.{left_address_col}_street_name_id
            INNER JOIN right_source as r
                ON m.id_a = r.{right_address_col}_street_name_id
        ),

        all_fuzzy_matches AS (
            SELECT {left_entity}_{left_ent_id_rename},
                    {right_entity}_{right_ent_id_rename},
                    {match_name}
            FROM (SELECT * FROM fuzzy_match_1
                UNION
                SELECT * FROM fuzzy_match_2)
            WHERE {same_condition} AND
            {condition}

        ),

        existing_links AS (
            SELECT *
            FROM {link_table}
        )

        SELECT *
        FROM   all_fuzzy_matches
        FULL JOIN existing_links
            USING({left_entity}_{left_ent_id_rename},{right_entity}_{right_ent_id_rename})

        """

        with duckdb.connect(database=db_path, read_only=False) as db_conn:
            db_conn.execute(query)
            console.log(f"[yellow] Created {match_name}")
            logger.debug(f"Created {match_name}")
            cols = [row[1] for row in db_conn.execute(f"PRAGMA table_info('{link_table}')").fetchall()]
            for col in cols:
                db_conn.execute(f"UPDATE {link_table} SET {col} = 0 WHERE {col} IS NULL")

            # set datatype to int or float as expected
            if "fuzzy" in match_name:
                db_conn.execute(f"UPDATE {link_table} SET {match_name} = CAST({match_name} AS FLOAT)")
            else:
                db_conn.execute(f"UPDATE {link_table} SET {match_name} = CAST({match_name} AS INT1)")

    return None

Given two tables and a tfidf matching entity table, create a fuzzy match between the two tables. Creates a match column called {left_entity}{left_table}{left_name_col}{right_entity}{right_table}{right_name_col}_fuzzy_match and appends to link table link.{left_entity}{right_entity}

Source code in src/chainlink/link/link_utils.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
def execute_fuzzy_link(
    db_path: str | Path,
    left_entity: str,
    left_table: str,
    left_ent_id: str,
    left_name_col: str,
    right_entity: str,
    right_table: str,
    right_ent_id: str,
    right_name_col: str,
    tfidf_table: str = "link.tfidf_staging",
    link_exclusions: Optional[list] = None,
) -> None:
    """

    Given two tables and a tfidf matching entity table, create a fuzzy match between the two tables.
    Creates a match column called
    {left_entity}_{left_table}_{left_name_col}_{right_entity}_{right_table}_{right_name_col}_fuzzy_match
    and appends to link table link.{left_entity}_{right_entity}
    """
    if link_exclusions is None:
        link_exclusions = []

    link_table = f"link.{left_entity}_{right_entity}"

    # align the names of the match columns
    left_side = f"{left_entity}_{left_table}_{left_name_col}"
    right_side = f"{right_entity}_{right_table}_{right_name_col}"
    if left_entity != right_entity:
        if left_side < right_side:
            match_name = f"{left_side}_{right_side}_fuzzy_match"
        else:
            match_name = f"{right_side}_{left_side}_fuzzy_match"
    else:
        match_name = f"{left_side}_{right_side}_fuzzy_match"

    # check link exclusion
    if any(exclusion in match_name for exclusion in link_exclusions):
        return None

    same_condition = "TRUE"

    if left_ent_id == right_ent_id and left_entity == right_entity:
        left_ent_id_rename = f"{left_ent_id}_1"
        right_ent_id_rename = f"{right_ent_id}_2"
        # if same id, want to remove dupes
        same_condition = f"{left_entity}_{left_ent_id_rename} < {right_entity}_{right_ent_id_rename}"
    else:
        left_ent_id_rename = left_ent_id
        right_ent_id_rename = right_ent_id

    query = f"""
    CREATE OR REPLACE TABLE {link_table} AS

    WITH tfidf_matches AS (
        SELECT id_a,
               id_b,
               similarity as {match_name}
        FROM {tfidf_table}
    ),

    left_source AS (
        SELECT {left_ent_id} as {left_entity}_{left_ent_id_rename},
                {left_name_col}_name_id
        FROM {left_entity}.{left_table}
    ),

    right_source AS (
        SELECT {right_ent_id} as {right_entity}_{right_ent_id_rename},
               {right_name_col}_name_id
        FROM {right_entity}.{right_table}
    ),

    fuzzy_match_1 AS (
        SELECT l.{left_entity}_{left_ent_id_rename},
               r.{right_entity}_{right_ent_id_rename},
               m.{match_name}
        FROM   tfidf_matches as m
        INNER JOIN left_source as l
            ON m.id_a = l.{left_name_col}_name_id
        INNER JOIN right_source as r
            ON m.id_b = r.{right_name_col}_name_id
    ),

    fuzzy_match_2 AS (
        SELECT l.{left_entity}_{left_ent_id_rename},
               r.{right_entity}_{right_ent_id_rename},
               m.{match_name}
        FROM   tfidf_matches as m
        INNER JOIN left_source as l
            ON m.id_b = l.{left_name_col}_name_id
        INNER JOIN right_source as r
            ON m.id_a = r.{right_name_col}_name_id
    ),

    all_fuzzy_matches AS (
        SELECT *
        FROM (SELECT * FROM fuzzy_match_1
              UNION
              SELECT * FROM fuzzy_match_2)
        where {same_condition}
    ),

    existing_links AS (
        SELECT *
        FROM {link_table}
    )

    SELECT *
    FROM   all_fuzzy_matches
    FULL JOIN existing_links
        USING({left_entity}_{left_ent_id_rename},{right_entity}_{right_ent_id_rename})

    """

    with duckdb.connect(database=db_path, read_only=False) as db_conn:
        db_conn.execute(query)
        console.log(f"[yellow] Created {match_name}")
        logger.debug(f"Created {match_name}")
        cols = [row[1] for row in db_conn.execute(f"PRAGMA table_info('{link_table}')").fetchall()]
        for col in cols:
            db_conn.execute(f"UPDATE {link_table} SET {col} = 0 WHERE {col} IS NULL")

        # set datatype to int or float as expected
        if "fuzzy" in match_name:
            db_conn.execute(f"UPDATE {link_table} SET {match_name} = CAST({match_name} AS FLOAT)")
        else:
            db_conn.execute(f"UPDATE {link_table} SET {match_name} = CAST({match_name} AS INT1)")

    return None

Exact matches between two column in two tables. Creates a match column called {left_entity}{left_table}{left_matching_col}{right_entity}{right_table}{right_matching_col}{match_type} and appends to link table link.{left_entity}_{right_entity}

Returns: None

Source code in src/chainlink/link/link_utils.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def execute_match(
    db_path: str | Path,
    match_type: str,
    left_entity: str,
    left_table: str,
    left_matching_col: str,
    left_matching_id: str,
    left_ent_id: str,
    right_entity: str,
    right_table: str,
    right_matching_col: str,
    right_matching_id: str,
    right_ent_id: str,
    skip_address: bool = False,
    link_exclusions: Optional[list] = None,
) -> None:
    """
    Exact matches between two column in two tables.
    Creates a match column called
    {left_entity}_{left_table}_{left_matching_col}_{right_entity}_{right_table}_{right_matching_col}_{match_type}
    and appends to link table link.{left_entity}_{right_entity}

    Returns: None
    """
    if link_exclusions is None:
        link_exclusions = []

    # if two different ids just dont want duplicates
    matching_condition = "!="

    if left_ent_id == right_ent_id and left_entity == right_entity:
        left_ent_id_edit = f"{left_ent_id}_1"
        right_ent_id_edit = f"{right_ent_id}_2"
        # if same id, only want one direction of matches
        matching_condition = "<"
    else:
        left_ent_id_edit = left_ent_id
        right_ent_id_edit = right_ent_id

    link_table = f"link.{left_entity}_{right_entity}"

    # align the names of the match columns
    left_side = f"{left_entity}_{left_table}_{left_matching_col}"
    right_side = f"{right_entity}_{right_table}_{right_matching_col}"
    if left_entity != right_entity:
        if left_side < right_side:
            match_name_col = f"{left_side}_{right_side}_{match_type}"
        else:
            match_name_col = f"{right_side}_{left_side}_{match_type}"
    else:
        match_name_col = f"{left_side}_{right_side}_{match_type}"

    # check link exclusion
    if any(exclusion in match_name_col for exclusion in link_exclusions):
        return None

    if skip_address:
        address_condition = " != 1"
        left_address_condition = f"l.{left_matching_col}_skip {address_condition}"
        right_address_condition = f"r.{right_matching_col}_skip {address_condition}"
        left_extra_col = f", {left_matching_col}_skip"
        right_extra_col = f", {right_matching_col}_skip"
    else:
        left_address_condition = "TRUE"
        right_address_condition = "TRUE"
        left_extra_col = ""
        right_extra_col = ""

    temp_table = match_name_col + "_table"

    matching_query = f"""
            CREATE SCHEMA IF NOT EXISTS link;

            CREATE OR REPLACE TABLE link.{temp_table} AS
            SELECT l.{left_entity}_{left_ent_id_edit},
                   r.{right_entity}_{right_ent_id_edit},
                   1 AS {match_name_col}
            FROM
                (SELECT {left_ent_id} AS {left_entity}_{left_ent_id_edit},
                        {left_matching_id} {left_extra_col}
                FROM {left_entity}.{left_table}
                ) as l
            JOIN
                (SELECT {right_ent_id} AS {right_entity}_{right_ent_id_edit},
                        {right_matching_id} {right_extra_col}
                FROM {right_entity}.{right_table}
                ) as r
                ON l.{left_matching_id} = r.{right_matching_id}
                AND l.{left_matching_id} IS NOT NULL
                AND r.{right_matching_id} IS NOT NULL
                AND l.{left_entity}_{left_ent_id_edit} {matching_condition} r.{right_entity}_{right_ent_id_edit}
            WHERE
                {left_address_condition}
                AND {right_address_condition}
        ;"""

    with duckdb.connect(database=db_path, read_only=False) as db_conn:
        db_conn.execute(matching_query)
        console.log(f"[yellow] Created {match_name_col}")
        logger.debug(f"Created {match_name_col}")

        execute_match_processing(
            db_conn=db_conn,
            link_table=link_table,
            out_temp_table_name=temp_table,
            id_col_1=f"{left_entity}_{left_ent_id_edit}",
            match_name_col=match_name_col,
            id_col_2=f"{right_entity}_{right_ent_id_edit}",
        )
        logger.debug(f"Finished match processing for {match_name_col}")

    return None

given a two address columns, match the addresses: * match by raw address string * match by clean street string * if street id matches, match by unit * match street name and number if zipcode matches

Creates four match columns called {left_entity}{left_table}{left_matching_col}{right_entity}{right_table}{right_matching_col}{match_type} and appends to link table link.{left_entity}_{right_entity}

Returns: None

Source code in src/chainlink/link/link_utils.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def execute_match_address(
    db_path: str | Path,
    left_entity: str,
    left_table: str,
    left_address: str,
    left_ent_id: str,
    right_entity: str,
    right_table: str,
    right_address: str,
    right_ent_id: str,
    skip_address: bool = False,
    link_exclusions: Optional[list] = None,
) -> None:
    """
    given a two address columns, match the addresses:
        * match by raw address string
        * match by clean street string
        * if street id matches, match by unit
        * match street name and number if zipcode matches

    Creates four match columns called
    {left_entity}_{left_table}_{left_matching_col}_{right_entity}_{right_table}_{right_matching_col}_{match_type}
    and appends to link table link.{left_entity}_{right_entity}



    Returns: None
    """
    if link_exclusions is None:
        link_exclusions = []

    ## Match by raw address string and by street id
    for match in ["street", "address"]:
        logger.debug(f"Executing {match} match")
        execute_match(
            db_path=db_path,
            match_type=f"{match}_match",
            left_entity=left_entity,
            left_table=left_table,
            left_matching_col=left_address,
            left_matching_id=f"{left_address}_{match}_id",
            left_ent_id=left_ent_id,
            right_entity=right_entity,
            right_table=right_table,
            right_matching_col=right_address,
            right_matching_id=f"{right_address}_{match}_id",
            right_ent_id=right_ent_id,
            skip_address=skip_address,
            link_exclusions=link_exclusions,
        )

    ## If street id matches, match by unit
    left_side = f"{left_entity}_{left_table}_{left_address}"
    right_side = f"{right_entity}_{right_table}_{right_address}"
    if left_entity != right_entity:
        if left_side < right_side:
            street_match_to_check = f"{left_side}_{right_side}_street_match"
        else:
            street_match_to_check = f"{right_side}_{left_side}_street_match"
    else:
        street_match_to_check = f"{left_side}_{right_side}_street_match"

    execute_match_unit(
        db_path=db_path,
        left_entity=left_entity,
        right_entity=right_entity,
        # TODO will ording of left and right address mess things up
        street_match_to_check=street_match_to_check,
        left_table=left_table,
        left_address=left_address,
        left_ent_id=left_ent_id,
        right_table=right_table,
        right_address=right_address,
        right_ent_id=right_ent_id,
        skip_address=skip_address,
        link_exclusions=link_exclusions,
    )

Steps to run after matches are created. append matches to link table, set null matches to 0, and drop temp table of matches runs in execute_match()

Returns: None

Source code in src/chainlink/link/link_utils.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def execute_match_processing(
    db_conn: DuckDBPyConnection,
    link_table: str,
    out_temp_table_name: str,
    id_col_1: str,
    match_name_col: str,
    id_col_2: str,
) -> None:
    """
    Steps to run after matches are created.
    append matches to link table, set null matches to 0, and drop temp table of matches
    runs in execute_match()

    Returns: None
    """
    # check if link table exists
    link_table_check = db_conn.execute(
        f"""SELECT COUNT(*)
        FROM information_schema.tables
        WHERE table_name = '{link_table.split(".")[1]}'
               and table_schema = '{link_table.split(".")[0]}'"""
    ).fetchone()[0]

    link_table_exists = link_table_check != 0

    # append to link table
    db_conn.execute(query_append_to_links(link_table_exists, link_table, out_temp_table_name, id_col_1, id_col_2))

    # set null matches to 0
    db_conn.execute(f"UPDATE {link_table} SET {match_name_col} = 0 WHERE {match_name_col} IS NULL")

    cols = [row[1] for row in db_conn.execute(f"PRAGMA table_info('{link_table}')").fetchall()]
    for col in cols:
        db_conn.execute(f"UPDATE {link_table} SET {col} = 0 WHERE {col} IS NULL")

    # set datatype to int or float as expected
    if "fuzzy" in match_name_col:
        db_conn.execute(f"UPDATE {link_table} SET {match_name_col} = CAST({match_name_col} AS FLOAT)")
    else:
        db_conn.execute(f"UPDATE {link_table} SET {match_name_col} = CAST({match_name_col} AS INT1)")

    # drop temp table of matches
    db_conn.execute(f"DROP TABLE link.{out_temp_table_name}")

Given two address columns, if street id matches, match by unit.

Creates a match column called {left_entity}{left_table}{left_address}{right_entity}{right_table}{right_address}_unit_match and appends to link table link.{left_entity}{right_entity}

Source code in src/chainlink/link/link_utils.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def execute_match_unit(
    db_path: str | Path,
    left_entity: str,
    right_entity: str,
    street_match_to_check: str,
    left_table: str,
    left_address: str,
    left_ent_id: str,
    right_table: str,
    right_address: str,
    right_ent_id: str,
    skip_address: bool = False,
    link_exclusions: Optional[list] = None,
) -> None:
    """
    Given two address columns, if street id matches, match by unit.

    Creates a match column called
    {left_entity}_{left_table}_{left_address}_{right_entity}_{right_table}_{right_address}_unit_match
    and appends to link table link.{left_entity}_{right_entity}
    """
    if link_exclusions is None:
        link_exclusions = []

    # if same id, only want one direction of matches
    if left_ent_id == right_ent_id and left_entity == right_entity:
        left_ent_id_edit = f"{left_ent_id}_1"
        right_ent_id_edit = f"{right_ent_id}_2"
    else:
        left_ent_id_edit = left_ent_id
        right_ent_id_edit = right_ent_id

    link_table = f"link.{left_entity}_{right_entity}"

    # align the names of the match columns
    left_side = f"{left_entity}_{left_table}_{left_address}"
    right_side = f"{right_entity}_{right_table}_{right_address}"
    if left_entity != right_entity:
        if left_side < right_side:
            match_name_col = f"{left_side}_{right_side}_unit_match"
        else:
            match_name_col = f"{right_side}_{left_side}_unit_match"
    else:
        match_name_col = f"{left_side}_{right_side}_unit_match"
    # check link exclusion
    if any(exclusion in match_name_col for exclusion in link_exclusions):
        return None

    if skip_address:
        address_condition = " != 1"
        left_address_condition = f"{left_address}_skip {address_condition}"
        right_address_condition = f"{right_address}_skip {address_condition}"
    else:
        left_address_condition = "TRUE"
        right_address_condition = "TRUE"

    temp_table = match_name_col + "_table"

    matching_query = f"""

        CREATE OR REPLACE TABLE link.{temp_table} AS

        WITH link as (
            SELECT {left_entity}_{left_ent_id_edit},
                    {right_entity}_{right_ent_id_edit}
            FROM {link_table}
            WHERE {street_match_to_check} = 1
            )

        ,lhs as (
            SELECT {left_ent_id} AS {left_entity}_{left_ent_id_edit},
                    {left_address}_unit_number AS unit_1
            FROM {left_entity}.{left_table}
            where {left_address_condition}

            )

        , rhs as (
            SELECT {right_ent_id} AS {right_entity}_{right_ent_id_edit},
                    {right_address}_unit_number AS unit_2
            FROM {right_entity}.{right_table}
            where {right_address_condition}
            )

        SELECT {left_entity}_{left_ent_id_edit},
               {right_entity}_{right_ent_id_edit},
               1 AS {match_name_col}
        FROM link
        LEFT JOIN lhs
        USING({left_entity}_{left_ent_id_edit})
        LEFT JOIN rhs
        USING({right_entity}_{right_ent_id_edit})
        WHERE unit_1 IS NOT NULL
        AND unit_2 IS NOT NULL
        AND CAST(unit_1 AS VARCHAR) = CAST(unit_2 AS VARCHAR);"""

    with duckdb.connect(database=db_path, read_only=False) as db_conn:
        db_conn.execute(matching_query)
        console.log(f"[yellow] Created {match_name_col}")
        logger.debug(f"Created {match_name_col}")

        execute_match_processing(
            db_conn=db_conn,
            link_table=link_table,
            out_temp_table_name=temp_table,
            id_col_1=f"{left_entity}_{left_ent_id_edit}",
            match_name_col=match_name_col,
            id_col_2=f"{right_entity}_{right_ent_id_edit}",
        )

    return None

create all possible combinations of across tables in the same entity, but do not include combos within the same table if address_idx is not empty, also create across combos between address tables

Source code in src/chainlink/link/link_utils.py
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
def generate_combos_within_across_tables(name_idx: list, address_idx: Optional[list] = None) -> tuple:
    """
    create all possible combinations of across tables in the same entity,
    but do not include combos within the same table
    if address_idx is not empty, also create across combos between address tables
    """
    if address_idx is None:
        address_idx = []

    across_combos_name_idx = list(itertools.combinations(range(len(name_idx)), 2))
    across_name_combos: list = []
    for i, j in across_combos_name_idx:
        across_name_combos += itertools.product(name_idx[i], name_idx[j])
        across_name_combos += itertools.product(name_idx[j], name_idx[i])

    if len(address_idx) > 0:
        across_address_combos: list = []
        across_combos_address_idx = list(itertools.combinations(range(len(address_idx)), 2))
        for i, j in across_combos_address_idx:
            across_address_combos += itertools.product(address_idx[i], address_idx[j])
            across_address_combos += itertools.product(address_idx[j], address_idx[i])

        return across_name_combos, across_address_combos

    else:
        return across_name_combos, []

create a table of tfidf matches between two entities and adds to db

Returns: None

Source code in src/chainlink/link/link_utils.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
def generate_tfidf_links(
    db_path: str | Path,
    table_location: str = "entity.name_similarity",
    source_table_name: str | None = None,
) -> None:
    """
    create a table of tfidf matches between two entities and adds to db

    Returns: None
    """

    console.log("[yellow] Process started")
    logger.info("Process started")

    # retrieve entity list, print length of dataframe
    entity_list = database_query(db_path, table_name=source_table_name)
    console.log(f"[yellow] Query retrieved {len(entity_list)} rows")
    logger.debug(f"Query retrieved {len(entity_list)} rows")

    # returns a pandas df
    entity_col = entity_list.columns[0]
    id_col = entity_list.columns[1]
    matches_df = superfast_tfidf(entity_list, id_col, entity_col)

    console.log("[yellow] Fuzzy Matching done")
    logger.info("Fuzzy Matching done")

    # load back to db
    with duckdb.connect(database=db_path, read_only=False) as db_conn:
        query = f"""CREATE OR REPLACE TABLE {table_location} AS
                    SELECT *
                    FROM  matches_df"""

        db_conn.execute(query)

query to append links to link table runs in execute_match_processing()

Source code in src/chainlink/link/link_utils.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def query_append_to_links(
    link_table_exists: bool,
    link_table: str,
    table_to_append: str,
    id_col1: str,
    id_col2: str,
) -> str:
    """
    query to append links to link table
    runs in execute_match_processing()
    """

    # if link table does not exist then its just table to append
    if link_table_exists:
        query = f"""
        CREATE OR REPLACE TABLE {link_table} AS
        SELECT DISTINCT *
        FROM {link_table}
        FULL JOIN link.{table_to_append}
        USING({id_col1}, {id_col2})"""

    else:
        query = f"""
        CREATE OR REPLACE TABLE {link_table} AS
        SELECT DISTINCT *
        FROM link.{table_to_append}"""

    return query

tfidf_utils

adjust_and_replace(string)

replace specified words with blanks and other words with their corresponding values for ngrams

Source code in src/chainlink/link/tfidf_utils.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def adjust_and_replace(string: str) -> str:
    """
    replace specified words with blanks and other words with their corresponding values for ngrams
    """

    # remove punctuation
    string = re.sub(r"[,-./]", r"", string)

    # split the string into words
    parts = string.split()

    # replace words based on blank_words and flat_ngram_adj using list comprehension
    adjusted_string = "".join(["" if part in blank_words else flat_ngram_adj.get(part, part) for part in parts])

    return adjusted_string.strip()

clean_matches(matches_df)

remove self matches and duplicates in match dataframe

Returns: pl.DataFrame

Source code in src/chainlink/link/tfidf_utils.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def clean_matches(matches_df: pl.DataFrame) -> pl.DataFrame:
    """
    remove self matches and duplicates in match dataframe

    Returns: pl.DataFrame
    """

    # create copy to make adjustments
    # matches_df = matches_df.copy()
    # remove self matches, duplicates and sort
    matches_df = (
        matches_df.filter(pl.col("id_a") != pl.col("id_b"))
        .with_columns(pl.concat_list(pl.col("id_a", "id_b")).list.sort().alias("sorted_id_pairs"))
        .unique("sorted_id_pairs")
        .drop("sorted_id_pairs")
        .sort("similarity", descending=True)
    )

    return matches_df

database_query(db_path, table_name=None, limit=None)

queries entities for comparison

Source code in src/chainlink/link/tfidf_utils.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def database_query(db_path: str | Path, table_name: str | None = None, limit: int | None = None) -> pl.DataFrame:
    """
    queries entities for comparison
    """
    if table_name is None:
        table_name = "entity.name"
        id_col = "name_id"
    else:
        id_col = table_name.split(".")[1] + "_id"

    # start connection with woc db
    with duckdb.connect(db_path) as conn:
        entity_query = f"""
        SELECT entity, {id_col}
        FROM {table_name}
        """

        # retreive entity list (all unique names in parcel, llc and corp data
        entity_list = conn.execute(entity_query).pl()

        # randomized sample for limit
        if limit is not None:
            entity_list = entity_list.sample(n=limit)

    return entity_list

get_matches_df(sparse_matrix, name_vector, top=None)

create a matches dataframe given matrix of ngrams references sparse_matrix - matrix from vectorized comparison calculations name_vector - list of names to compare id_vector - id of distinct name from entities list

Source code in src/chainlink/link/tfidf_utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def get_matches_df(sparse_matrix: csr_matrix, name_vector: np.ndarray, top: None = None) -> pl.DataFrame:
    """
    create a matches dataframe given matrix of ngrams
    references
        sparse_matrix - matrix from vectorized comparison calculations
        name_vector - list of names to compare
        id_vector - id of distinct name from entities list
    """
    non_zeros = sparse_matrix.nonzero()

    sparserows = non_zeros[0]
    sparsecols = non_zeros[1]

    nr_matches = top if top else sparsecols.size

    entity_a = np.empty([nr_matches], dtype=object)
    entity_b = np.empty([nr_matches], dtype=object)
    similarity = np.zeros(nr_matches)

    for index in range(0, nr_matches):
        entity_a[index] = name_vector[sparserows[index]]
        entity_b[index] = name_vector[sparsecols[index]]
        similarity[index] = sparse_matrix.data[index]

    data = {
        "entity_a": entity_a,
        "entity_b": entity_b,
        "similarity": similarity,
    }
    df = pl.DataFrame(data).with_columns(
        pl.col("entity_a").hash().alias("id_a"), pl.col("entity_b").hash().alias("id_b")
    )
    return df

ngrams(string, n=3)

split string into substrings of length n, return list of substrings

Source code in src/chainlink/link/tfidf_utils.py
162
163
164
165
166
167
168
def ngrams(string: str, n: int = 3) -> list:
    """
    split string into substrings of length n, return list of substrings
    """
    pre_processing = adjust_and_replace(string)
    ngrams = zip(*[pre_processing[i:] for i in range(n)])
    return ["".join(ngram) for ngram in ngrams]

superfast_tfidf(entity_list, id_col='name_id', entity_col='entity')

returns sorted list of top matched names

Source code in src/chainlink/link/tfidf_utils.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def superfast_tfidf(entity_list: pl.DataFrame, id_col: str = "name_id", entity_col: str = "entity") -> pl.DataFrame:
    """
    returns sorted list of top matched names
    """

    # matching

    entity_list = entity_list.filter(~pl.col(entity_col).is_null())
    company_names = entity_list.select(entity_col).to_series()
    if len(company_names) < 2:
        matches_df = pl.DataFrame(data={"entity_a": [], "entity_b": [], "similarity": [], "id_a": [], "id_b": []})
        return matches_df
    vectorizer = TfidfVectorizer(min_df=1, analyzer=ngrams)
    tf_idf_matrix = vectorizer.fit_transform(company_names.to_numpy())
    matches = ct.sp_matmul_topn(tf_idf_matrix, tf_idf_matrix.transpose(), 50, 0.8, sort=True, n_threads=-1)
    matches_df = get_matches_df(sparse_matrix=matches, name_vector=company_names.to_numpy())
    matches_df = clean_matches(matches_df)

    return matches_df

load

load_generic

load_generic(db_path, schema_config, bad_addresses)

Loads a generic file into the database.

Reads config file, loops through each file listed, cleans the data, creates a unique id for name, street, and street_name, loads into cleaned files into a database using the schema name from the config file, and lastly updates the entity name files.

Returns None.

Source code in src/chainlink/load/load_generic.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def load_generic(db_path: str | Path, schema_config: dict, bad_addresses: list) -> None:
    """
    Loads a generic file into the database.

    Reads config file, loops through each file listed, cleans the data,
    creates a unique id for name, street, and street_name,
    loads into cleaned files into a database using the schema name from the config file,
    and lastly updates the entity name files.

    Returns None.
    """

    schema_name = schema_config["schema_name"]

    with duckdb.connect(db_path, read_only=False) as conn:
        for table_config in schema_config["tables"]:
            # Read the data
            console.log(f"[yellow] Data: {table_config['table_name']} -- Reading data")
            logger.info(f"Data: {table_config['table_name']} -- Reading data")
            file_path = table_config.get("table_name_path")
            if not file_path:
                raise ValueError(f"No file path provided for table: {table_config['table_name']}")

            if not os.path.exists(file_path):
                raise FileNotFoundError(f"Data file not found: {file_path}")

            file_extension = file_path.split(".")[-1].lower()
            if file_extension not in ["csv", "parquet"]:
                raise ValueError(f"Unsupported file format: {file_extension}. Supported formats: csv, parquet")

            try:
                df = (
                    pl.read_csv(file_path, infer_schema=False)
                    if file_extension == "csv"
                    else pl.read_parquet(file_path)
                )
                # convert all columns to string
                df = df.cast(pl.String)

            except Exception as e:
                raise Exception(f"Error reading file {file_path}: {e!s}") from None

            validate_input_data(df, table_config)

            # Clean the data and create ids
            console.log(f"""[yellow] Data: {table_config["table_name"]} -- Starting cleaning""")
            logger.info(f"""Data: {table_config["table_name"]} -- Starting cleaning""")

            all_columns = []
            all_columns.append(table_config["id_col_og"])
            for col in table_config.get("name_cols_og", ""):
                all_columns.append(col)
            for col in table_config.get("address_cols_og", ""):
                all_columns.append(col)

            # Make headers snake case
            df.columns = [x.lower().replace(" ", "_") for x in df.columns]
            # df.columns = df.columns.str, regex=True)

            df = clean_generic(df, table_config)

            # load the data to db
            console.log(f"""[yellow] Data: {table_config["table_name"]} -- Starting load""")

            table_name = table_config["table_name"]
            load_to_db(
                df=df,
                table_name=table_name,
                db_conn=conn,
                schema=schema_name,
            )

            # add new names to entity_names table
            console.log(f"""[yellow] Data: {table_config["table_name"]} -- Updating entity name tables""")
            logger.info(f"""Data: {table_config["table_name"]} -- Updating entity name tables""")

            all_id_cols = ["name_id", "address_id", "street_id", "street_name_id"]

            id_cols = []
            for col in df.columns:
                if any(c in col for c in all_id_cols) and "subaddress_identifier" not in col:
                    id_cols.append(col)

            for col in id_cols:
                update_entity_ids(df=df, entity_id_col=col, db_conn=conn)

            # create bad address flag
            if table_config.get("address_cols"):
                for col in table_config["address_cols"]:
                    execute_flag_bad_addresses(
                        db_conn=conn,
                        table=f"{schema_name}.{table_name}",
                        address_col=col,
                        bad_addresses=bad_addresses,
                    )

load_utils

clean_generic(df, config)

Cleans the name and address for a generic file. Appends a new column with the cleaned name and address.

Returns a pl.DataFrame

Source code in src/chainlink/load/load_utils.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def clean_generic(df: pl.DataFrame, config: dict) -> pl.DataFrame:
    """
    Cleans the name and address for a generic file. Appends a new
    column with the cleaned name and address.

    Returns a pl.DataFrame
    """

    # Clean the name
    for col in config["name_cols"]:
        # lower snake case
        col = col.lower().replace(" ", "_")

        raw_name = col + "_raw"
        id_col_name = col + "_name"

        # weird case TODO
        if raw_name in df.columns:
            df = df.drop(columns=[raw_name])
        df = (
            df.rename({col: raw_name})
            .with_columns(
                pl.col(raw_name)
                .fill_null("")
                .str.to_uppercase()
                .map_elements(clean_names, return_dtype=pl.String)
                .alias(col)
            )
            .with_columns(pl.col(col).alias(id_col_name))
        )

        df = create_id_col(df, id_col_name)
        df = df.drop(id_col_name)

    # Clean the address
    if config.get("address_cols"):
        for col in config["address_cols"]:
            # lower snake case
            col = col.lower().replace(" ", "_")

            raw_address = col + "_raw"
            temp_address = "temp_" + col
            console.log(f"[yellow] Cleaning address column {col}")

            df = df.with_columns(
                pl.col(col).fill_null("").str.to_uppercase().alias(raw_address),
                pl.col(col).fill_null("").str.to_uppercase().alias(temp_address),
            )
            df = df.with_columns(
                pl.col(temp_address).map_elements(
                    clean_address,
                    return_dtype=pl.Struct([
                        pl.Field("raw", pl.Utf8),
                        pl.Field("address_number", pl.Utf8),
                        pl.Field("street_pre_directional", pl.Utf8),
                        pl.Field("street_name", pl.Utf8),
                        pl.Field("street_post_type", pl.Utf8),
                        pl.Field("unit_type", pl.Utf8),
                        pl.Field("unit_number", pl.Utf8),
                        pl.Field("subaddress_type", pl.Utf8),
                        pl.Field("subaddress_identifier", pl.Utf8),
                        pl.Field("city", pl.Utf8),
                        pl.Field("state", pl.Utf8),
                        pl.Field("postal_code", pl.Utf8),
                        pl.Field("street", pl.Utf8),
                        pl.Field("address_norm", pl.Utf8),
                    ]),
                )
            )
            ta_fields = df[temp_address].struct.fields
            new_fields = [f"{col}_{f}" for f in ta_fields]
            df = (
                df.drop(raw_address)
                .with_columns(pl.col(temp_address).struct.rename_fields(new_fields))
                .unnest(temp_address)
                .with_columns(
                    pl.col(f"{col}_postal_code").cast(pl.String).map_elements(clean_zipcode, return_dtype=pl.String),
                    pl.col(f"{col}_address_norm").cast(pl.String).alias(col + "_address"),
                )
                .with_columns(pl.col(col + "_address").replace("", None))
            )

            id_cols = ["address", "street", "street_name"]

            # create id col
            for id_col in id_cols:
                name = col + "_" + id_col
                df = create_id_col(df, name)

        # drop temp cols
        df = df.drop(col + "_address")
    return df

create_id_col(df, col)

Adds an id column to the DataFrame using pl.Series.hash function

Returns a pl.DataFrame

Source code in src/chainlink/load/load_utils.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def create_id_col(df: pl.DataFrame, col: str) -> pl.DataFrame:
    """
    Adds an id column to the DataFrame using pl.Series.hash function

    Returns a pl.DataFrame
    """

    col_id = col + "_id"

    df = df.with_columns(pl.col(col).hash().alias(col_id)).with_columns(
        pl.when(pl.col(col).is_null()).then(None).otherwise(pl.col(col_id)).alias(col_id).cast(pl.UInt64),
    )

    return df

execute_flag_bad_addresses(db_conn, table, address_col, bad_addresses)

Flags rows with bad addresses as provided by user

Source code in src/chainlink/load/load_utils.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def execute_flag_bad_addresses(db_conn: DuckDBPyConnection, table: str, address_col: str, bad_addresses: list) -> None:
    """
    Flags rows with bad addresses as provided by user
    """
    console.log(f"[yellow] Flagging bad addresses in {table} table for {address_col} column")
    if bad_addresses:
        bad_addresses_tuple = tuple(bad_addresses)

        query = f"""
                CREATE OR REPLACE TABLE {table} AS
                SELECT *,
                        CASE WHEN
                            ({address_col} in {bad_addresses_tuple}
                            OR {address_col}_street in {bad_addresses_tuple}) THEN 1
                        ELSE 0 END as {address_col}_skip
                from {table}
                """

    else:
        query = f"""
            CREATE OR REPLACE TABLE {table} AS
            SELECT *, 0 as {address_col}_skip
            from {table}
            """
        console.log(f"[yellow] No bad addresses to flag in {table} table for {address_col} column")

    db_conn.execute(query)
    return None

load_to_db(df, table_name, db_conn, schema)

Loads parquet file into table in database.

Parameters

filepath : str Directory of parquet file to load onto database. table_name : str Name of resulting table in database. db_conn : object Connection object to desired duckdb database. schema : str Name of schema for resulting table in database.

Returns

None

Source code in src/chainlink/load/load_utils.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def load_to_db(df: pl.DataFrame, table_name: str, db_conn: DuckDBPyConnection, schema: str) -> None:
    """Loads parquet file into table in database.

    Parameters
    ----------
    filepath : str
        Directory of parquet file to load onto database.
    table_name : str
        Name of resulting table in database.
    db_conn : object
        Connection object to desired duckdb database.
    schema : str
        Name of schema for resulting table in database.

    Returns
    -------
    None
    """
    df = df
    query = f"""
            CREATE SCHEMA IF NOT EXISTS {schema};
            DROP TABLE IF EXISTS {schema}.{table_name};
            CREATE TABLE {schema}.{table_name} AS
               SELECT *
               FROM df;
            """

    db_conn.execute(query)

update_entity_ids(df, entity_id_col, db_conn)

Adds new ids to the entity schema table. If the value is already in the table, it is not added.

Returns None

Source code in src/chainlink/load/load_utils.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def update_entity_ids(df: pl.DataFrame, entity_id_col: str, db_conn: DuckDBPyConnection) -> None:
    """
    Adds new ids to the entity schema table. If the value is already in the table, it is not added.

    Returns None
    """

    split_col = entity_id_col.split("_")
    if "_".join(split_col[-3:]) == "street_name_id":
        entity_table_name = "street_name"
        entity_col = entity_id_col.split("_id")[0]
    elif "_".join(split_col[-2:]) == "street_id":
        entity_table_name = "street"
        entity_col = entity_id_col.split("_id")[0]
    elif "_".join(split_col[-2:]) == "address_id":
        entity_col = entity_id_col.replace("_address_id", "")
        entity_table_name = "address"
    elif "_".join(split_col[-2:]) == "name_id":
        entity_col = entity_id_col.replace("_name_id", "")
        entity_table_name = "name"

    if not check_table_exists(db_conn, "entity", entity_table_name):
        # a check if entity tables doesnt exist, just creates it
        query = f"""
                CREATE SCHEMA IF NOT EXISTS entity;

                CREATE TABLE entity.{entity_table_name} AS
                    SELECT {entity_col} as entity,
                           {entity_id_col} as {entity_table_name + "_id"}
                    from  df
                ;
                """

    else:
        # otherwise, add any new entities to the existing table

        query = f"""
                CREATE OR REPLACE TABLE entity.{entity_table_name} AS (


                SELECT {entity_col} as entity,
                       {entity_id_col} as {entity_table_name + "_id"}
                from  df

                UNION DISTINCT

                select entity,
                       {entity_table_name + "_id"}
                from   entity.{entity_table_name}
                )
                """
    db_conn.execute(query)

    return None

validate_input_data(df, table_config)

Validates input data against configuration requirements

Source code in src/chainlink/load/load_utils.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def validate_input_data(df: pl.DataFrame, table_config: dict) -> None:
    """
    Validates input data against configuration requirements
    """
    required_columns = set()
    required_columns.add(table_config["id_col_og"])
    if table_config.get("name_cols_og"):
        required_columns.update(table_config["name_cols_og"])
    if table_config.get("address_cols_og"):
        required_columns.update(table_config["address_cols_og"])

    missing_columns = required_columns - set(df.columns)
    if missing_columns:
        raise ValueError(f"Missing required columns: {', '.join(missing_columns)}")

    # Check for empty dataframe
    if df.is_empty():
        raise ValueError("Input data is empty")

    # Check for minimum required non-null values
    for col in required_columns:
        null_count = df.select(pl.col(col).is_null().sum()).item()
        if null_count == len(df):
            raise ValueError(f"Column {col} contains all null values")

main

Given a correctly formatted config file, * load in any schemas in the config that are not already in the database * create within links for each new schema * create across links for each new schema with all existing schemas

Returns true if the database was created successfully.

Source code in src/chainlink/main.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def chainlink(
    config: dict,
    config_path: str | Path = DIR / "configs/config.yaml",
) -> bool:
    """
    Given a correctly formatted config file,
        * load in any schemas in the config that are not already in the database
        * create within links for each new schema
        * create across links for each new schema with all existing schemas


    Returns true if the database was created successfully.
    """
    probabilistic = config["options"].get("probabilistic", False)
    load_only = config["options"].get("load_only", False)
    db_path = config["options"].get("db_path", DIR / "db/linked.db")

    no_names = True
    no_addresses = True

    # create snake case columns
    for schema in config["schemas"]:
        for table in schema["tables"]:
            if len(table["name_cols"]) > 0:
                no_names = False
                table["name_cols_og"] = table["name_cols"]
                table["name_cols"] = [x.lower().replace(" ", "_") for x in table["name_cols"]]
            else:
                table["name_cols"] = []

            if len(table["address_cols"]) > 0:
                no_addresses = False
                table["address_cols_og"] = table["address_cols"]
                table["address_cols"] = [x.lower().replace(" ", "_") for x in table["address_cols"]]
            else:
                table["address_cols"] = []

            table["id_col_og"] = table["id_col"]
            table["id_col"] = table["id_col"].lower().replace(" ", "_")

    # handle options
    overwrite_db = config["options"].get("overwrite_db", False)
    if overwrite_db and os.path.exists(db_path):
        os.remove(db_path)
        console.print(f"[red] Removed existing database at {db_path}")
        logger.info(f"Removed existing database at {db_path}")

    update_config_only = config["options"].get("update_config_only", False)
    if update_config_only:
        update_config(db_path, config, config_path)
        return True

    bad_address_path = config["options"].get("bad_address_path", None)
    if bad_address_path is not None:
        try:
            bad_addresses_df = pl.read_csv(bad_address_path)
            bad_addresses = bad_addresses_df[:, 0].to_list()
        except Exception:
            bad_addresses = []
    else:
        bad_addresses = []

    # list of link exclusions

    link_exclusions = config["options"].get("link_exclusions", None)
    if not link_exclusions:
        link_exclusions = []

    # all columns in db to compare against
    with duckdb.connect(database=db_path, read_only=False) as con:
        df_db_columns = con.sql("show all tables").pl()

    schemas = config["schemas"]
    new_schemas = []

    # load each schema. if schema is a new entity, create links
    for schema_config in schemas:
        schema_name = schema_config["schema_name"]

        # if not force create, check if each col exists, and skip if so
        if not overwrite_db:
            if df_db_columns.filter(pl.col("schema") == schema_name).shape[0] == 0:
                new_schemas.append(schema_name)
        else:
            new_schemas.append(schema_name)

    # load in all new schemas
    for new_schema in new_schemas:
        schema_config = [schema for schema in schemas if schema["schema_name"] == new_schema][0]

        with console.status(f"[bold yellow] Working on loading {new_schema}") as status:
            # load schema
            load_generic(
                db_path=db_path,
                schema_config=schema_config,
                bad_addresses=bad_addresses,
            )

        if not load_only:
            # create exact links
            with console.status(f"[bold yellow] Working on linking {new_schema}") as status:
                create_within_links(
                    db_path=db_path,
                    schema_config=schema_config,
                    link_exclusions=link_exclusions,
                )

    if not load_only and probabilistic:
        #  generate all the fuzzy links and store in entity.name_similarity
        # only if there are new schemas added
        if len(new_schemas) > 0:
            with console.status("[bold yellow] Working on fuzzy matching scores") as status:
                if not no_names:
                    generate_tfidf_links(db_path, table_location="entity.name_similarity")
                if not no_addresses:
                    generate_tfidf_links(
                        db_path,
                        table_location="entity.street_name_similarity",
                        source_table_name="entity.street_name",
                    )

        # for across link
        links = []
        created_schemas = []

        # create tfidf links within each new schema
        for new_schema in new_schemas:
            schema_config = [schema for schema in schemas if schema["schema_name"] == new_schema][0]

            if probabilistic:
                with console.status(f"[bold yellow] Working on fuzzy matching links in {new_schema}") as status:
                    create_tfidf_within_links(
                        db_path=db_path,
                        schema_config=schema_config,
                        link_exclusions=link_exclusions,
                    )

            # also create across links for each new schema
            existing_schemas = [schema for schema in schemas if schema["schema_name"] != new_schema]

            new_schema_config = [schema for schema in schemas if schema["schema_name"] == new_schema][0]

            # make sure we havent already created this link combo
            for schema in existing_schemas:
                if sorted(new_schema + schema["schema_name"]) not in created_schemas:
                    links.append((new_schema_config, schema))
                    created_schemas.append(sorted(new_schema + schema["schema_name"]))

        # across links for each new_schema, link across to all existing entities
        for new_schema_config, existing_schema in links:
            with console.status(
                f"[bold yellow] Working on links between {new_schema_config['schema_name']} and {existing_schema['schema_name']}"
            ) as status:
                create_across_links(
                    db_path=db_path,
                    new_schema=new_schema_config,
                    existing_schema=existing_schema,
                    link_exclusions=link_exclusions,
                )

            if probabilistic:
                with console.status(
                    f"[bold yellow] Working on fuzzy links between {new_schema_config['schema_name']} and {existing_schema['schema_name']}"
                ) as status:
                    create_tfidf_across_links(
                        db_path=db_path,
                        new_schema=new_schema_config,
                        existing_schema=existing_schema,
                        link_exclusions=link_exclusions,
                    )

    update_config(db_path, config, config_path)

    export_tables_flag = config["options"].get("export_tables", False)
    if export_tables_flag:
        path = DIR / "data" / "export"
        export_tables(db_path, path)

    return True  ## TODO: check if this is true or false

main(config=typer.Argument(DIR / 'config' / 'chainlink_config.yaml', exists=True, readable=True))

Given a correctly formatted config file, * load in any schemas in the config that are not already in the database * create within links for each new schema * create across links for each new schema with all existing schemas

Returns 'True' if the database was created successfully.

Source code in src/chainlink/main.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@app.command()
def main(config: str = typer.Argument(DIR / "config" / "chainlink_config.yaml", exists=True, readable=True)) -> None:
    """
    Given a correctly formatted config file,
        * load in any schemas in the config that are not already in the database
        * create within links for each new schema
        * create across links for each new schema with all existing schemas

    Returns 'True' if the database was created successfully.
    """
    config_dict = load_config(config) if config is not None and os.path.exists(config) else create_config()
    chainlink(config_dict, config_path=config)

    console.print("[green bold] chainlink complete, database created")
    logger.info("chainlink complete, database created")

utils

add_schema_config(config)

Helper to add a schema to an existing config

Source code in src/chainlink/utils.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def add_schema_config(config: dict) -> dict:
    """
    Helper to add a schema to an existing config
    """

    schema_name = Prompt.ask("[green]> Enter the name of the schema", default="main")
    config["schemas"].append({"schema_name": schema_name, "tables": []})
    config = add_table_config(config, schema_name)
    add_table = Confirm.ask("[green]> Add a table to this schema?", default=True, show_default=True)
    while add_table:
        config = add_table_config(config, schema_name)
        add_table = Confirm.ask(
            "[green]> Add another table to this schema?",
            default=False,
            show_default=True,
        )
    console.print("[green italic]> Schema added successfully!")
    return config

add_table_config(config, schema_name)

Helper to add a table to an existing schema

Source code in src/chainlink/utils.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def add_table_config(config: dict, schema_name: str) -> dict:
    """
    Helper to add a table to an existing schema
    """

    table_name = Prompt.ask("[green]> Enter the name of dataset:", default="dataset", show_default=True)
    table_name = table_name.lower().replace(" ", "_")
    table_name_path = Prompt.ask("[green]> Enter the path to the dataset")
    while not os.path.exists(table_name_path):
        table_name_path = Prompt.ask("[red]> Path does not exist. Please enter a valid path")
    id_col = Prompt.ask("[green]> Enter the id column of the dataset. Must be unique")
    name_col_str = Prompt.ask("[green]> Enter the name column(s) (comma separated)")
    name_cols = [_.strip() for _ in name_col_str.split(",")]
    address_col_str = Prompt.ask("[green]> Enter the address column(s) (comma separated)")
    address_cols = [_.strip() for _ in address_col_str.split(",")]

    for idx, schema in enumerate(config["schemas"]):
        if schema["schema_name"] == schema_name:
            config["schemas"][idx]["tables"].append({
                "table_name": table_name,
                "table_name_path": table_name_path,
                "id_col": id_col,
                "name_cols": name_cols,
                "address_cols": address_cols,
            })

    return config

check_table_exists(db_conn, schema, table_name)

check if a table exists

Returns: bool

Source code in src/chainlink/utils.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def check_table_exists(db_conn: DuckDBPyConnection, schema: str, table_name: str) -> bool:
    """
    check if a table exists

    Returns: bool
    """

    db_conn.execute(
        f"""    SELECT COUNT(*)
                FROM   information_schema.tables
                WHERE  table_name = '{table_name}'
                AND    table_schema = '{schema}'"""
    )

    return db_conn.fetchone()[0] == 1

create_config()

Helper to create config file from user input if not pre created

Source code in src/chainlink/utils.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def create_config() -> dict:
    """
    Helper to create config file from user input if not pre created
    """
    create_config_path = Prompt.ask(
        "[green]> Enter config path. [Leave blank if you would you like to create a new one]",
        default="",
        show_default=False,
    )
    create_config_path = create_config_path.strip()
    if create_config_path.lower() != "":
        while not os.path.exists(create_config_path):
            create_config_path = Prompt.ask("[red]> Yaml path does not exist. Please enter a valid path")
            create_config_path = create_config_path.strip()

        config = load_config(create_config_path)

        while True:
            if validate_config(config):
                break
            else:  # invalid config
                # print(validate_config(config))
                create_config_path = Prompt.ask("[red]> Invalid config. Please enter a valid yaml config")
                create_config_path = input().strip()
                config = load_config(create_config_path)

        return config
    else:
        config = {
            "options": {
                "overwrite_db": False,
                "export_tables": False,
                "update_config_only": False,
                "link_exclusions": [],
                "bad_address_path": None,
                "probabilistic": False,
                "load_only": False,
            },
            "schemas": [],
        }
        # build config with user input
        config["options"]["db_path"] = Prompt.ask(
            "[green]> Enter the path to the resulting database",
            default="db/linked.db",
            show_default=True,
        )

        config["options"]["load_only"] = Confirm.ask(
            "[green]> Only clean and load data to the database (without matching)?",
            show_default=True,
            default=False,
        )

        if not config["options"]["load_only"]:
            config["options"]["probablistic"] = Confirm.ask(
                "[green]> Run probabilisitic name and address matching?",
                show_default=True,
                default=False,
            )

        config["options"]["export_tables"] = Confirm.ask(
            "[green]> Export tables to parquet after load?",
            show_default=True,
            default=False,
        )

        bad_address_path = Prompt.ask(
            "[dim green]> [Optional] Provide path to bad address csv file",
            default="",
            show_default=False,
        )
        bad_address_path = bad_address_path.strip()
        if bad_address_path:
            while not os.path.exists(bad_address_path):
                console.print("> Bad address path does not exist. Please enter a valid path or leave blank:")
                bad_address_path = input().strip()
            config["options"]["bad_address_path"] = bad_address_path

        add_schema = Confirm.ask("[green]> Add a new schema?", default=True, show_default=True)
        while add_schema:
            config = add_schema_config(config)
            add_schema = Confirm.ask("> Add another schema?", default=False, show_default=True)

        return config

export_tables(db_path, data_path)

export all tables from database to parquet files in {data_path}/export directory

Returns: None

Source code in src/chainlink/utils.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def export_tables(db_path: str | Path, data_path: str | Path) -> None:
    """
    export all tables from database to parquet files in {data_path}/export directory

    Returns: None
    """

    # create export directory if doesn't exist
    if not os.path.exists(data_path):
        os.makedirs(data_path)

    def find_id_cols(row: dict) -> list:  # TODO: check if this is correct
        if row["schema"] == "link" or row["name"] == "name_similarity":
            return row["column_names"][:2]
        elif row["schema"] == "entity":
            return [row["column_names"][1]]
        else:
            return [row["column_names"][0]]

    with duckdb.connect(db_path) as conn:
        df_db_columns = conn.sql("show all tables").pl()

        df_db_columns = df_db_columns.with_columns(
            schema_table=pl.col("schema") + "." + pl.col("name"),
            id_col=pl.struct(pl.all()).map_elements(lambda x: find_id_cols(x), return_dtype=pl.List(pl.String)),
        )
        link_filter = (pl.col("schema") == "link") | (pl.col("name") == "name_similarity")

        links_to_export = zip(
            df_db_columns.filter(link_filter)["schema_table"].to_list(),
            df_db_columns.filter(link_filter)["id_col"].to_list(),
        )

        for link in links_to_export:
            links_query = f"""
                (select * from {link[0]}
                order by {link[1][0]} ASC, {link[1][1]} ASC);
            """
            d = conn.execute(links_query).pl().cast({link[1][0]: pl.String, link[1][1]: pl.String})
            d.write_parquet(f"{data_path}/{link[0].replace('.', '_')}.parquet")

        main_filter = (pl.col("schema") != "link") & (pl.col("name") != "name_similarity")
        print(main_filter)
        main_to_export = zip(
            df_db_columns.filter(main_filter)["schema_table"].to_list(),
            df_db_columns.filter(main_filter)["id_col"].to_list(),
        )

        for table, id_cols in main_to_export:
            sql_to_exec = f"""
                (select * from {table}
                order by {id_cols[0]} ASC);
            """
            d = conn.execute(sql_to_exec).pl().cast({id_cols[0]: pl.String})
            d.write_parquet(f"{data_path}/{table.replace('.', '_')}.parquet")

    print("Exported all tables!")
    logger.info("Exported all tables!")

load_config(file_path)

load yaml config file, clean up column names

Returns: dict

Source code in src/chainlink/utils.py
39
40
41
42
43
44
45
46
47
48
49
def load_config(file_path: str) -> dict:
    """
    load yaml config file, clean up column names

    Returns: dict
    """

    with open(file_path) as file:
        config = yaml.safe_load(file)

    return config

setup_logger(name, log_file, level=logging.DEBUG)

To setup as many loggers as you want

from https://stackoverflow.com/questions/11232230/logging-to-two-files-with-different-settings

Source code in src/chainlink/utils.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def setup_logger(name: str, log_file: str, level: int | str = logging.DEBUG) -> logging.Logger:
    """
    To setup as many loggers as you want
    # from https://stackoverflow.com/questions/11232230/logging-to-two-files-with-different-settings
    """

    formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")

    handler = logging.FileHandler(log_file)
    handler.setFormatter(formatter)

    logger = logging.getLogger(name)
    logger.setLevel(level)
    logger.addHandler(handler)

    return logger

update_config(db_path, config, config_path)

update config by adding in all existing link columns and last updated time. writes config back out to config.yaml

Returns: None

Source code in src/chainlink/utils.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def update_config(db_path: str | Path, config: dict, config_path: str | Path) -> None:
    """
    update config by adding in all existing link columns and last updated time.
    writes config back out to config.yaml

    Returns: None
    """

    with duckdb.connect(db_path) as conn:
        df_db_columns = conn.sql("show all tables").pl()

    all_links = []
    for cols in df_db_columns["column_names"].to_list():
        all_links += [col for col in cols if "match" in col]

    if "metadata" not in config:
        config["metadata"] = {}

    config["metadata"]["existing_links"] = all_links
    config["metadata"]["last_updated"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    with open(config_path, "w+") as f:
        yaml.dump(config, f)

validate_config(config)

Validates the configuration against a schema

Source code in src/chainlink/utils.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def validate_config(config: dict) -> bool:
    """
    Validates the configuration against a schema
    """
    schema = {
        "type": "object",
        "required": ["options", "schemas"],
        "properties": {
            "options": {
                "type": "object",
                "required": ["db_path"],
                "properties": {
                    "overwrite_db": {"type": "boolean"},
                    "export_tables": {"type": "boolean"},
                    "update_config_only": {"type": "boolean"},
                    "link_exclusions": {"type": ["array", "null"]},  # or none
                    "bad_address_path": {"type": "string"},  # or none
                },
            },
            "schemas": {
                "type": "array",
                "items": {
                    "type": "object",
                    "required": ["schema_name", "tables"],
                    "properties": {
                        "schema_name": {"type": "string"},
                        "tables": {
                            "type": "array",
                            "items": {
                                "type": "object",
                                "required": ["table_name", "table_name_path", "id_col"],
                                "properties": {
                                    "table_name": {"type": "string"},
                                    "table_name_path": {"type": "string"},
                                    "id_col": {"type": "string"},
                                    "name_cols": {
                                        "type": ["array", "null"],
                                        "items": {"type": "string"},
                                    },
                                    "address_cols": {
                                        "type": ["array", "null"],
                                        "items": {"type": "string"},
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
    }

    try:
        jsonschema.validate(instance=config, schema=schema)
    except jsonschema.exceptions.ValidationError as e:
        console.print(f"[bold red]> Invalid configuration: {e!s}")
        return False

    # ids across tables but within schema should be the same
    for schema in config["schemas"]:
        ids = set()
        for table in schema["tables"]:
            ids.add(table["id_col"])

        if len(ids) != 1:
            console.print(f"[bold red]> All tables in schema {schema['schema_name']} must have the same id column")
            return False

    # no exception
    return True