Wprowadzenie: przygotowywanie danych pod kątem zgodności z RODO
Ogólne rozporządzenie o ochronie danych (RODO) i California Consumer Privacy Act (KPA) to przepisy dotyczące prywatności i zabezpieczeń danych, które wymagają od firm trwałego i całkowitego usunięcia wszystkich danych osobowych zebranych na temat klienta po ich wyraźnym żądaniu. Znane również jako "prawo do zapomnienia" (RTBF) lub "prawo do wymazywania danych", żądania usunięcia muszą być wykonywane w określonym okresie (na przykład w ciągu jednego miesiąca kalendarzowego).
W tym artykule opisano sposób implementowania rtBF na danych przechowywanych w usłudze Databricks. Przykład zawarty w tym artykule modeluje zestawy danych dla firmy zajmującej się handlem elektronicznym i pokazuje, jak usuwać dane w tabelach źródłowych i propagować te zmiany do tabel podrzędnych.
Strategia wdrażania "prawa do zapomnienia"
Na poniższym diagramie pokazano, jak wdrożyć "prawo do zapomnienia".
usuwanie punktu za pomocą usługi Delta Lake
Usługa Delta Lake przyspiesza usuwanie punktów w dużych magazynach danych za pomocą transakcji ACID, co umożliwia lokalizowanie i usuwanie danych osobowych w odpowiedzi na żądania RODO lub KPA konsumentów.
Usługa Delta Lake zachowuje historię tabel i udostępnia je dla zapytań punktowych i wycofywania. Funkcja VACUUM usuwa pliki danych, do których nie odwołuje się tabela delty i są starsze niż określony próg przechowywania, trwale usuwając dane. Aby dowiedzieć się więcej na temat ustawień domyślnych i zaleceń, zobacz Praca z historią tabel usługi Delta Lake.
Uwaga
W przypadku tabel z włączonymi wektorami usuwania należy również uruchomić polecenie REORG TABLE ... APPLY (PURGE)
, aby trwale usunąć rekordy bazowe. Zobacz Zastosowanie zmian do plików danych Parquet.
Usuwanie danych w źródłach nadrzędnych
RODO i KPA mają zastosowanie do wszystkich danych, w tym danych w źródłach spoza usługi Delta Lake, takich jak Kafka, pliki i bazy danych. Oprócz usuwania danych w usłudze Databricks należy również pamiętać o usuwaniu danych w źródłach nadrzędnych, takich jak kolejki i magazyn w chmurze.
Całkowite usunięcie jest bardziej preferowane niż zaciemnianie.
Musisz wybrać między usuwaniem danych i zaciemnianiem ich. Zaciemnianie można zaimplementować przy użyciu pseudonimizacji, maskowania danych itp. Jednak najbezpieczniejszą opcją jest całkowite usunięcie, ponieważ w praktyce wyeliminowanie ryzyka redentyfikacji często wymaga całkowitego usunięcia danych PII.
Usuń dane w warstwie brązowej, a następnie propaguj usunięcia do warstwy srebrnej i złotej.
Zalecamy rozpoczęcie procesu zgodności z RODO i KPSA od usunięcia danych w warstwie brązowej, sterowanego przez zaplanowane zadanie, które wysyła zapytania do tabeli kontrolnej zawierającej żądania usunięcia. Po usunięciu danych z warstwy brązowej zmiany mogą być propagowane do warstwy srebrnej i złotej.
Regularnie konserwuj tabele w celu usunięcia danych z plików historycznych
Domyślnie usługa Delta Lake zachowuje historię tabel, w tym usunięte rekordy, przez 30 dni i udostępnia ją na potrzeby podróży i wycofywania czasu. Ale nawet jeśli poprzednie wersje danych zostaną usunięte, dane są nadal przechowywane w magazynie w chmurze. W związku z tym należy regularnie obsługiwać tabele i widoki, aby usunąć poprzednie wersje danych. Zalecanym sposobem jest optymalizacja predyktywna dla zarządzanych tabel Unity Catalog, która inteligentnie utrzymuje zarówno tabele strumieniowe, jak i zmaterializowane widoki. DLT automatycznie wykonuje zadania konserwacji w ciągu 24 godzin od zaktualizowania tabel przesyłania strumieniowego i zmaterializowanych widoków.
Jeśli nie używasz optymalizacji predykcyjnej lub DLT, należy uruchomić polecenie VACUUM
w tabelach delty, aby trwale usunąć poprzednie wersje danych. Domyślnie spowoduje to ograniczenie możliwości podróży w czasie do 7 dni, co jest konfigurowalnym ustawieniem, i usunie historyczne wersje danych, o których mowa, ze storage'u w chmurze.
Usuń dane PII z warstwy brązowej
W zależności od konstrukcji Twojego lakehouse, możliwe jest zerwanie powiązania między danymi osobowymi a danymi nieosobowymi użytkownika. Na przykład, jeśli używasz klucza nienaturalnego, takiego jak user_id
, zamiast naturalnego klucza, takiego jak adres e-mail, możesz usunąć dane osobowe (PII), pozostawiając dane nieosobowe na miejscu.
Pozostała część artykułu zajmuje się RTBF, całkowicie usuwając rekordy użytkowników ze wszystkich brązowych tabel. Dane można usunąć, wykonując polecenie DELETE
, jak pokazano w poniższym kodzie:
spark.sql("DELETE FROM bronze.users WHERE user_id = 5")
Podczas jednoczesnego usuwania dużej liczby rekordów zalecamy użycie polecenia MERGE
. Poniższy kod zakłada, że masz tabelę sterowania o nazwie gdpr_control_table
zawierającą kolumnę user_id
. Do tej tabeli wstawiasz rekord dla każdego użytkownika, który zażądał "prawa do zapomnienia" w tej tabeli.
Polecenie MERGE
określa warunek dopasowania wierszy. W tym przykładzie rekordy z target_table
są dopasowywane do rekordów w gdpr_control_table
na podstawie user_id
. Jeśli istnieje dopasowanie (na przykład user_id
w target_table
i gdpr_control_table
), wiersz w target_table
zostanie usunięty. Po pomyślnym wykonaniu tego polecenia MERGE
zaktualizuj tabelę sterowania, aby potwierdzić, że żądanie zostało przetworzone.
spark.sql("""
MERGE INTO target
USING (
SELECT user_id
FROM gdpr_control_table
) AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN DELETE
""")
Propagowanie zmian z warstw brązu na warstwy srebra i złota
Po usunięciu danych z warstwy brązowej, należy przekazywać zmiany do tabel w warstwach srebrnej i złotej.
Zmaterializowane widoki: Automatyczna obsługa usuwania
Zmaterializowane widoki automatycznie obsługują usunięcia w źródłach. W związku z tym nie trzeba wykonywać żadnych specjalnych czynności, aby upewnić się, że zmaterializowany widok nie zawiera danych usuniętych ze źródła. Należy odświeżyć zmaterializowany widok i uruchomić czynności konserwacyjne, w celu zapewnienia, że usunięcia są całkowicie przetwarzane.
Zmaterializowany widok zawsze zwraca prawidłowy wynik, ponieważ używa obliczeń przyrostowych, jeśli jest to tańsze niż pełne przeliczenie, ale nigdy kosztem poprawności. Innymi słowy usunięcie danych ze źródła może spowodować, że zmaterializowany widok zostanie w pełni ponownie skompilowany.
Tabele streamingu: usuwanie danych i odczytywanie źródła streamingu przy użyciu skipChangeCommits.
Tabele przesyłania strumieniowego mogą przetwarzać tylko dane dołączane. Oznacza to, że tabele przesyłania strumieniowego oczekują, że w źródle przesyłania strumieniowego pojawią się tylko nowe wiersze danych. Każda inna operacja, taka jak aktualizowanie lub usuwanie dowolnego rekordu z tabeli źródłowej używanej do przesyłania strumieniowego, nie jest obsługiwana i przerywa strumień.
Ponieważ przesyłanie strumieniowe obsługuje tylko nowe dane, musisz samodzielnie obsługiwać zmiany danych. Zalecana metoda to: (1) usunięcie danych w źródle przesyłania strumieniowego, (2) usunięcie danych z tabeli przesyłania strumieniowego, a następnie (3) zaktualizowanie odczytu przesyłania strumieniowego w celu użycia skipChangeCommits
. Ta flaga wskazuje usłudze Databricks, że tabela strumieniowa powinna pomijać wszelkie inne operacje oprócz wstawiania, takie jak aktualizacje lub usuwanie.
Alternatywnie możesz (1) usunąć dane ze źródła, (2) usunąć je z tabeli przesyłania strumieniowego, a następnie (3) w pełni odświeżyć tabelę przesyłania strumieniowego. Gdy w pełni odświeżasz tabelę strumieniową, jej stan strumieniowy zostaje wyczyszczony, a wszystkie dane są ponownie przetwarzane. Wszystkie nadrzędne źródła danych, które wykraczają poza okres przechowywania (na przykład temat platformy Kafka, który starzeje dane po 7 dniach), nie zostaną ponownie przetworzone, co może spowodować utratę danych. Zalecamy tę opcję wyłącznie w przypadku tabel przesyłania strumieniowego, gdy dostępne są dane historyczne i ich ponowne przetwarzanie nie będzie kosztowne.
Tabele Delta: obsługuj operacje usuwania przy użyciu funkcji 'readChangeFeed'
Zwykłe tabele delty nie zawierają żadnej specjalnej obsługi usuwania danych z góry. Zamiast tego należy napisać własny kod, aby usunięcia były do nich propagowane (na przykład spark.readStream.option("readChangeFeed", true).table("source_table")
).
Przykład: zgodność z RODO i CCPA dla firmy zajmującej się handlem elektronicznym
Na poniższym diagramie przedstawiono architekturę medalonu dla firmy zajmującej się handlem elektronicznym, w której należy wdrożyć zgodność z RODO & KODA. Mimo że dane użytkownika są usuwane, warto policzyć ich działania w agregacjach podrzędnych.
-
warstwa brązu
-
users
— wymiary użytkownika. Zawiera dane osobowe (na przykład adres e-mail). -
clickstream
— kliknij zdarzenia. Zawiera dane osobowe (na przykład adres IP). -
gdpr_requests
— tabela kontrolna zawierająca identyfikatory użytkowników z zastrzeżeniem "prawa do zapomnienia".
-
- srebrna warstwa
-
clicks_hourly
— łączna liczba kliknięć na godzinę. Jeśli usuniesz użytkownika, nadal chcesz policzyć swoje kliknięcia. -
clicks_by_user
— łączna liczba kliknięć na użytkownika. Jeśli usuniesz użytkownika, nie chcesz liczyć ich kliknięć.
-
-
warstwa złota
-
revenue_by_user
— łączne wydatki według każdego użytkownika.
-
Krok 1. Wypełnianie tabel przykładowymi danymi
Poniższy kod tworzy te dwie tabele:
-
source_users
zawiera dane wymiarowe dotyczące użytkowników. Ta tabela zawiera kolumnę PII o nazwieemail
. -
source_clicks
zawiera dane zdarzeń dotyczące działań wykonywanych przez użytkowników. Zawiera kolumnę PII o nazwieip_address
.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DateType
catalog = "users"
schema = "name"
# Create table containing sample users
users_schema = StructType([
StructField('user_id', IntegerType(), False),
StructField('username', StringType(), True),
StructField('email', StringType(), True),
StructField('registration_date', StringType(), True),
StructField('user_preferences', MapType(StringType(), StringType()), True)
])
users_data = [
(1, 'alice', 'alice@example.com', '2021-01-01', {'theme': 'dark', 'language': 'en'}),
(2, 'bob', 'bob@example.com', '2021-02-15', {'theme': 'light', 'language': 'fr'}),
(3, 'charlie', 'charlie@example.com', '2021-03-10', {'theme': 'dark', 'language': 'es'}),
(4, 'david', 'david@example.com', '2021-04-20', {'theme': 'light', 'language': 'de'}),
(5, 'eve', 'eve@example.com', '2021-05-25', {'theme': 'dark', 'language': 'it'})
]
users_df = spark.createDataFrame(users_data, schema=users_schema)
users_df.write..mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_users")
# Create table containing clickstream (i.e. user activities)
from pyspark.sql.types import TimestampType
clicks_schema = StructType([
StructField('click_id', IntegerType(), False),
StructField('user_id', IntegerType(), True),
StructField('url_clicked', StringType(), True),
StructField('click_timestamp', StringType(), True),
StructField('device_type', StringType(), True),
StructField('ip_address', StringType(), True)
])
clicks_data = [
(1001, 1, 'https://example.com/home', '2021-06-01T12:00:00', 'mobile', '192.168.1.1'),
(1002, 1, 'https://example.com/about', '2021-06-01T12:05:00', 'desktop', '192.168.1.1'),
(1003, 2, 'https://example.com/contact', '2021-06-02T14:00:00', 'tablet', '192.168.1.2'),
(1004, 3, 'https://example.com/products', '2021-06-03T16:30:00', 'mobile', '192.168.1.3'),
(1005, 4, 'https://example.com/services', '2021-06-04T10:15:00', 'desktop', '192.168.1.4'),
(1006, 5, 'https://example.com/blog', '2021-06-05T09:45:00', 'tablet', '192.168.1.5')
]
clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
clicks_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_clicks")
Krok 2: Stwórz pipeline, który przetwarza dane PII
Poniższy kod tworzy brązowe, srebrne i złote warstwy architektury medalonu pokazanej powyżej.
import dlt
from pyspark.sql.functions import col, concat_ws, count, countDistinct, avg, when, expr
catalog = "users"
schema = "name"
# ----------------------------
# Bronze Layer - Raw Data Ingestion
# ----------------------------
@dlt.table(
name=f"{catalog}.{schema}.users_bronze",
comment='Raw users data loaded from source'
)
def users_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_users")
)
@dlt.table(
name=f"{catalog}.{schema}.clicks_bronze",
comment='Raw clicks data loaded from source'
)
def clicks_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_clicks")
)
# ----------------------------
# Silver Layer - Data Cleaning and Enrichment
# ----------------------------
@dlt.table(
name=f"{catalog}.{schema}.users_silver",
comment='Cleaned and standardized users data'
)
@dlt.expect_or_drop('valid_email', "email IS NOT NULL")
def users_silver():
return (
spark.readStream
.table(f"{catalog}.{schema}.users_bronze")
.withColumn('registration_date', col('registration_date').cast('timestamp'))
.dropDuplicates(['user_id', 'registration_date'])
.select('user_id', 'username', 'email', 'registration_date', 'user_preferences')
)
@dlt.table(
name=f"{catalog}.{schema}.clicks_silver",
comment='Cleaned and standardized clicks data'
)
@dlt.expect_or_drop('valid_click_timestamp', "click_timestamp IS NOT NULL")
def clicks_silver():
return (
spark.readStream
.table(f"{catalog}.{schema}.clicks_bronze")
.withColumn('click_timestamp', col('click_timestamp').cast('timestamp'))
.withWatermark('click_timestamp', '10 minutes')
.dropDuplicates(['click_id'])
.select('click_id', 'user_id', 'url_clicked', 'click_timestamp', 'device_type', 'ip_address')
)
@dlt.table(
name=f"{catalog}.{schema}.user_clicks_silver",
comment='Joined users and clicks data on user_id'
)
def user_clicks_silver():
# Read users_silver as a static DataFrame
users = spark.read.table(f"{catalog}.{schema}.users_silver")
# Read clicks_silver as a streaming DataFrame
clicks = spark.readStream \
.table('clicks_silver')
# Perform the join
joined_df = clicks.join(users, on='user_id', how='inner')
return joined_df
# ----------------------------
# Gold Layer - Aggregated and Business-Level Data
# ----------------------------
@dlt.table(
name=f"{catalog}.{schema}.user_behavior_gold",
comment='Aggregated user behavior metrics'
)
def user_behavior_gold():
df = spark.read.table(f"{catalog}.{schema}.user_clicks_silver")
return (
df.groupBy('user_id')
.agg(
count('click_id').alias('total_clicks'),
countDistinct('url_clicked').alias('unique_urls')
)
)
@dlt.table(
name=f"{catalog}.{schema}.marketing_insights_gold",
comment='User segments for marketing insights'
)
def marketing_insights_gold():
df = spark.read.table(f"{catalog}.{schema}.user_behavior_gold")
return (
df.withColumn(
'engagement_segment',
when(col('total_clicks') >= 100, 'High Engagement')
.when((col('total_clicks') >= 50) & (col('total_clicks') < 100), 'Medium Engagement')
.otherwise('Low Engagement')
)
)
Krok 3. Usuwanie danych w tabelach źródłowych
W tym kroku usuniesz dane we wszystkich tabelach, w których znaleziono dane osobowe.
catalog = "users"
schema = "name"
def apply_gdpr_delete(user_id):
tables_with_pii = ["clicks_bronze", "users_bronze", "clicks_silver", "users_silver", "user_clicks_silver"]
for table in tables_with_pii:
print(f"Deleting user_id {user_id} from table {table}")
spark.sql(f"""
DELETE FROM {catalog}.{schema}.{table}
WHERE user_id = {user_id}
""")
Krok 4. Dodawanie polecenia skipChangeCommits do definicji tabel przesyłania strumieniowego, których dotyczy problem
W tym kroku należy poinstruować DLT, aby pomijało wiersze, które nie są dołączane. Dodaj opcję skipChangeCommits do następujących metod. Nie musisz aktualizować definicji zmaterializowanych widoków, ponieważ będą one automatycznie obsługiwać aktualizacje i usuwać:
users_bronze
users_silver
clicks_bronze
clicks_silver
user_clicks_silver
Poniższy kod pokazuje, jak zaktualizować metodę users_bronze
:
def users_bronze():
return (
spark.readStream.option('skipChangeCommits', 'true').table(f"{catalog}.{schema}.source_users")
)
Gdy ponownie uruchomisz potok, zostanie on pomyślnie zaktualizowany.