Kopieren und Transformieren von Daten in Snowflake V1 mithilfe von Azure Data Factory oder Azure Synapse Analytics
GILT FÜR: Azure Data Factory
Azure Synapse Analytics
Tipp
Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!
In diesem Artikel wird beschrieben, wie Sie Daten mithilfe der Copy-Aktivität in Azure Data Factory- und Azure Synapse-Pipelines aus und in Snowflake kopieren sowie Daten mithilfe des Tools „Datenfluss“ in Snowflake transformieren. Weitere Informationen finden Sie im Einführungsartikel zu Data Factory oder Azure Synapse Analytics.
Wichtig
Der Snowflake V2-Connector bietet verbesserte native Snowflake-Unterstützung. Wenn Sie den Snowflake V1-Connector in Ihrer Lösung verwenden, sollten Sie möglichst bald ein Upgrade Ihres Snowflake-Connectors durchführen. Ausführliche Informationen zu den Unterschieden zwischen V2 und V1 finden Sie in diesem Abschnitt.
Unterstützte Funktionen
Für den Snowflake-Connector werden die folgenden Funktionen unterstützt:
Unterstützte Funktionen | IR |
---|---|
Kopieraktivität (Quelle/Senke) | ① ② |
Zuordnungsdatenfluss (Quelle/Senke) | ① |
Lookup-Aktivität | ① ② |
Skriptaktivität | ① ② |
① Azure Integration Runtime ② Selbstgehostete Integration Runtime
Für die Kopieraktivität unterstützt dieser Snowflake-Connector die folgenden Funktionen:
- Kopieren von Daten aus Snowflake unter Verwendung des Snowflake-Befehls COPY into [Speicherort], um optimale Leistung zu erzielen.
- Kopieren von Daten in Snowflake unter Verwendung des Snowflake-Befehls COPY into [Tabelle], um optimale Leistung zu erzielen. Snowflake in Azure wird unterstützt.
- Wenn für einen Proxy eine Verbindung mit Snowflake über eine selbstgehostete Integration Runtime erforderlich ist, müssen Sie die Umgebungsvariablen HTTP_PROXY und HTTPS_PROXY auf dem Integration Runtime-Host konfigurieren.
Voraussetzungen
Wenn sich Ihr Datenspeicher in einem lokalen Netzwerk, in einem virtuellen Azure-Netzwerk oder in einer virtuellen privaten Amazon-Cloud befindet, müssen Sie eine selbstgehostete Integration Runtime konfigurieren, um eine Verbindung herzustellen. Die von der selbstgehosteten Integration Runtime verwendeten IP-Adressen müssen der Positivliste hinzugefügt werden.
Handelt es sich bei Ihrem Datenspeicher um einen verwalteten Clouddatendienst, können Sie die Azure Integration Runtime verwenden. Ist der Zugriff auf IP-Adressen beschränkt, die in den Firewallregeln genehmigt sind, können Sie Azure Integration Runtime-IP-Adressen zur Positivliste hinzufügen.
Das für „Quelle“ oder „Senke“ verwendete Snowflake-Konto benötigt USAGE
-Zugriff auf die Datenbank und Lese-/Schreibzugriff auf das Schema und die zugehörigen Tabellen/Sichten. Außerdem muss es die Berechtigung CREATE STAGE
für das Schema haben, um die Phase „Extern“ mit SAS-URI erstellen zu können.
Die folgenden Werte für Kontoeigenschaften müssen festgelegt werden
Eigenschaft | Beschreibung | Erforderlich | Standard |
---|---|---|---|
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION | Gibt an, ob bei der Erstellung einer benannten externen Phase (mit CREATE STAGE) ein Speicherintegrationsobjekt als Anmeldeinformationen für den Zugriff auf einen privaten Cloudspeicherort benötigt wird. | FALSE | FALSE |
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION | Gibt an, ob eine benannte externe Phase, die auf ein Speicherintegrationsobjekt verweist, als Anmeldeinformationen für die Cloud verwendet werden soll, wenn Daten aus einem privaten Cloudspeicherort geladen oder in diesen entladen werden. | FALSE | FALSE |
Weitere Informationen zu den von Data Factory unterstützten Netzwerksicherheitsmechanismen und -optionen finden Sie unter Datenzugriffsstrategien.
Erste Schritte
Sie können eines der folgenden Tools oder SDKs verwenden, um die Kopieraktivität mit einer Pipeline zu verwenden:
- Das Tool „Daten kopieren“
- Azure-Portal
- Das .NET SDK
- Das Python SDK
- Azure PowerShell
- Die REST-API
- Die Azure Resource Manager-Vorlage
Erstellen eines verknüpften Diensts mit Snowflake über die Benutzeroberfläche
Verwenden Sie die folgenden Schritte, um einen verknüpften Dienst mit Snowflake auf der Azure-Portal Benutzeroberfläche zu erstellen.
Navigieren Sie in Ihrem Azure Data Factory- oder Synapse-Arbeitsbereich zu der Registerkarte „Verwalten“, wählen Sie „Verknüpfte Dienste“ aus und klicken Sie dann auf „Neu“:
Suchen Sie nach Snowflake, und wählen Sie dann den Snowflake-Connector aus.
Konfigurieren Sie die Dienstdetails, testen Sie die Verbindung, und erstellen Sie den neuen verknüpften Dienst.
Details zur Connector-Konfiguration
Die folgenden Abschnitte enthalten Details zu Eigenschaften, die zum Definieren von Entitäten speziell für einen Snowflake-Connector verwendet werden.
Eigenschaften des verknüpften Diensts
Dieser Snowflake-Connector unterstützt die folgenden Authentifizierungstypen. Weitere Informationen finden Sie in den entsprechenden Abschnitten.
Standardauthentifizierung
Die folgenden Eigenschaften werden für den mit Snowflake verknüpften Dienst unterstützt, wenn die Standardauthentifizierung verwendet wird.
Eigenschaft | Beschreibung | Erforderlich |
---|---|---|
type | Die „type“-Eigenschaft muss auf Snowflake festgelegt werden. | Ja |
connectionString | Gibt die erforderlichen Informationen für Verbindungen mit der Snowflake-Instanz an. Sie können das Kennwort oder die gesamte Verbindungszeichenfolge in Azure Key Vault speichern. Ausführlichere Informationen finden Sie in den Beispielen unter der Tabelle und im Artikel Speichern von Anmeldeinformationen in Azure Key Vault. Einige typische Einstellungen: - Kontoname: Der vollständige Kontoname Ihres Snowflake-Kontos (einschließlich zusätzlicher Segmente zur Identifizierung der Region und Cloudplattform), z. B. „xy12345.east-us-2.azure“. - Benutzername: Der Anmeldename des Benutzers für die Verbindung. - Kennwort: Das Kennwort für den Benutzer. - Datenbank: Die Standarddatenbank, die nach dem Herstellen der Verbindung verwendet werden soll. Dabei sollte es sich um eine vorhandene Datenbank handeln, für die die angegebene Rolle über die erforderlichen Berechtigungen verfügt. - Warehouse: Das virtuelle Warehouse, das nach dem Herstellen der Verbindung verwendet werden soll. Dabei sollte es sich um ein vorhandenes Warehouse handeln, für das die angegebene Rolle über die erforderlichen Berechtigungen verfügt. - Rolle: Die Standardrolle aus der Zugriffssteuerung, die in der Snowflake-Sitzung verwendet werden soll. Die angegebene Rolle sollte vorhanden und dem angegebenen Benutzer bereits zugewiesen sein. Die Standardrolle ist PUBLIC. |
Ja |
authenticationType | Legen Sie diese Eigenschaft auf Basic fest. | Ja. |
connectVia | Die Integration Runtime, die zum Herstellen einer Verbindung mit dem Datenspeicher verwendet wird. Sie können die Azure Integration Runtime oder eine selbstgehostete Integration Runtime verwenden (wenn sich der Datenspeicher in einem privaten Netzwerk befindet). Wenn kein Wert angegeben ist, wird die standardmäßige Azure Integration Runtime verwendet. | Nein |
Beispiel:
{
"name": "SnowflakeLinkedService",
"properties": {
"type": "Snowflake",
"typeProperties": {
"authenticationType": "Basic",
"connectionString": "jdbc:snowflake://<accountname>.snowflakecomputing.com/?user=<username>&password=<password>&db=<database>&warehouse=<warehouse>&role=<myRole>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Kennwort in Azure Key Vault:
{
"name": "SnowflakeLinkedService",
"properties": {
"type": "Snowflake",
"typeProperties": {
"authenticationType": "Basic",
"connectionString": "jdbc:snowflake://<accountname>.snowflakecomputing.com/?user=<username>&db=<database>&warehouse=<warehouse>&role=<myRole>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Dataset-Eigenschaften
Eine vollständige Liste mit den Abschnitten und Eigenschaften, die zum Definieren von Datasets zur Verfügung stehen, finden Sie im Artikel zu Datasets.
Folgende Eigenschaften werden für das Snowflake-Dataset unterstützt.
Eigenschaft | Beschreibung | Erforderlich |
---|---|---|
type | Die „type“-Eigenschaft des Datasets muss auf SnowflakeTable festgelegt werden. | Ja |
schema | Name des Schemas. Beachten Sie, das beim Schemanamen die Groß- und Kleinschreibung beachtet wird. | Quelle: Nein, Senke: Ja |
table | Name der Tabelle/Ansicht. Beachten Sie, dann beim Tabellennamen wird die Groß- und Kleinschreibung beachtet wird. | Quelle: Nein, Senke: Ja |
Beispiel:
{
"name": "SnowflakeDataset",
"properties": {
"type": "SnowflakeTable",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
Eigenschaften der Kopieraktivität
Eine vollständige Liste mit den Abschnitten und Eigenschaften zum Definieren von Aktivitäten finden Sie im Artikel Pipelines. Dieser Abschnitt enthält eine Liste der Eigenschaften, die von der Snowflake-Quelle und -Senke unterstützt werden.
Snowflake als Quelle
Der Snowflake-Connector verwendet den Snowflake-Befehl COPY into [Speicherort], um optimale Leistung zu erzielen.
Wenn der Senkendatenspeicher und das Format vom Snowflake-Befehl „COPY“ nativ unterstützt werden, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus Snowflake in die Senke durchführen. Details finden Sie unter Direktes Kopieren aus Snowflake. Verwenden Sie andernfalls das integrierte gestaffelte Kopieren aus Snowflake.
Beim Kopieren von Daten aus Snowflake werden die folgenden Eigenschaften im Abschnitt source der Kopieraktivität unterstützt.
Eigenschaft | Beschreibung | Erforderlich |
---|---|---|
type | Die „type“-Eigenschaft der Quelle der Kopieraktivität muss auf SnowflakeSource festgelegt werden. | Ja |
Abfrage | Gibt die SQL-Abfrage an, mit der Daten aus Snowflake gelesen werden. Wenn der Name des Schemas, der Tabelle und Spalten Kleinbuchstaben enthält, geben Sie den Objektbezeichner in der Abfrage an, z. B. select * from "schema"."myTable" .Die Ausführung der gespeicherten Prozedur wird nicht unterstützt. |
No |
exportSettings | Erweiterte Einstellungen, die zum Abrufen von Daten aus Snowflake verwendet werden. Sie können die vom Befehl „COPY into“ unterstützten Einstellungen konfigurieren, die der Dienst beim Aufrufen der Anweisung durchläuft. | Ja |
Unter exportSettings : |
||
type | Der Typ des Exportbefehls, festgelegt auf SnowflakeExportCopyCommand. | Ja |
additionalCopyOptions | Zusätzliche Kopieroptionen, die als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: MAX_FILE_SIZE, OVERWRITE. Weitere Informationen finden Sie unter Snowflake Copy Options (Snowflake-Kopieroptionen). | Nein |
additionalFormatOptions | Zusätzliche Dateiformatoptionen, die im Befehl „COPY“ als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Weitere Informationen finden Sie unter Snowflake Format Type Options (Snowflake-Formattypoptionen). | Nein |
Hinweis
Stellen Sie sicher, dass Sie die Berechtigung haben, den folgenden Befehl auszuführen sowie auf das Schema INFORMATION_SCHEMA und die Tabelle COLUMNS zugreifen zu können.
COPY INTO <location>
Direktes Kopieren aus Snowflake
Wenn der Senkendatenspeicher und das Format die in diesem Abschnitt beschriebenen Kriterien erfüllen, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus Snowflake in die Senke durchführen. Der Dienst überprüft die Einstellungen und gibt bei der Copy-Aktivitätsausführung einen Fehler aus, wenn die folgenden Kriterien nicht erfüllt werden:
Der verknüpfte Senkendienst ist Azure Blob Storage mit SAS-Authentifizierung. Wenn Sie Daten direkt im folgenden unterstützten Format in Azure Data Lake Storage Gen2 kopieren möchten, können Sie einen verknüpften Azure Blob-Dienst mit SAS-Authentifizierung für Ihr ADLS Gen2-Konto erstellen. Damit vermeiden Sie gestaffelte Kopiervorgänge aus Snowflake.
Das Senkendatenformat lautet Parquet, Durch Trennzeichen getrennter Text oder JSON mit den folgenden Konfigurationen:
- Beim Format Parquet wird einer der Komprimierungscodecs None, Snappy oder Lzo verwendet.
- Beim Format Durch Trennzeichen getrennter Text:
-
rowDelimiter
ist \r\n oder ein beliebiges einzelnes Zeichen. -
compression
kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein. -
encodingName
wird als Standardwert übernommen oder ist auf utf-8 festgelegt. -
quoteChar
ist als doppeltes Anführungszeichen, einfaches Anführungszeichen oder als leere Zeichenfolge (kein Anführungszeichen) festgelegt.
-
- Für das JSON-Format unterstützt direktes Kopieren nur den Fall, dass die Snowflake-Quelltabelle oder das Abfrageergebnis nur eine einzelne Spalte hat und der Datentyp dieser Spalte VARIANT, OBJECT oder ARRAY ist.
-
compression
kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein. -
encodingName
wird als Standardwert übernommen oder ist auf utf-8 festgelegt. -
filePattern
in der Senke der Kopieraktivität wird auf dem Standardwert belassen oder auf setOfObjects festgelegt.
-
In der Quelle der Copy-Aktivität ist
additionalColumns
nicht angegeben.Die Spaltenzuordnung ist nicht angegeben.
Beispiel:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
}
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
Gestaffeltes Kopieren aus Snowflake
Wenn der Senkendatenspeicher oder das Format nicht nativ mit dem Snowflake-Befehl „COPY“ kompatibel ist (wie im letzten Abschnitt beschrieben), aktivieren Sie mithilfe einer Azure Blob Storage-Zwischeninstanz das integrierte gestaffelte Kopieren. Das Feature für gestaffeltes Kopieren bietet Ihnen auch einen höheren Durchsatz. Der Dienst exportiert Daten aus Snowflake in den Stagingspeicher, kopiert dann die Daten in die Senke und bereinigt schließlich die temporären Daten aus dem Stagingspeicher. Ausführliche Informationen zum Kopieren von Daten mithilfe von Staging finden Sie unter Gestaffeltes Kopieren.
Um diese Funktion verwenden zu können, erstellen Sie einen mit Azure Blob Storage verknüpften Dienst, der auf das Azure Storage-Konto als Stagingzwischenspeicher verweist. Geben Sie dann die Eigenschaften enableStaging
und stagingSettings
in der Kopieraktivität an.
Hinweis
Der mit Azure Blob Storage verknüpfte Stagingdienst muss die SAS-Authentifizierung (Shared Access Signature) verwenden, die für den Snowflake-Befehl „COPY“ erforderlich ist. Erteilen Sie Snowflake die erforderlichen Zugriffsberechtigungen in Azure Blob Storage mit Staging. Weitere Informationen hierzu finden Sie in diesem Artikel.
Beispiel:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM MyTable",
"exportSettings": {
"type": "SnowflakeExportCopyCommand"
}
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Snowflake als Senke
Der Snowflake-Connector verwendet den Snowflake-Befehl COPY into [Tabelle], um optimale Leistung zu erzielen. Er unterstützt das Schreiben von Daten in Snowflake in Azure.
Wenn der Quelldatenspeicher und das Format vom Snowflake-Befehl „COPY“ nativ unterstützt werden, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus der Quelle in Snowflake durchführen. Details finden Sie unter Direktes Kopieren in Snowflake. Verwenden Sie andernfalls das integrierte gestaffelte Kopieren in Snowflake.
Beim Kopieren von Daten in Snowflake werden die folgenden Eigenschaften im Abschnitt sink der Kopieraktivität unterstützt.
Eigenschaft | Beschreibung | Erforderlich |
---|---|---|
type | Die „type“-Eigenschaft der Senke der Kopieraktivität, festgelegt auf SnowflakeSink. | Ja |
preCopyScript | Geben Sie eine SQL-Abfrage an, die bei jeder Ausführung von der Kopieraktivität ausgeführt werden soll, bevor Daten in Snowflake geschrieben werden. Sie können diese Eigenschaft nutzen, um vorab geladene Daten zu bereinigen. | Nein |
importSettings | Erweiterte Einstellungen, die zum Schreiben von Daten in Snowflake verwendet werden. Sie können die vom Befehl „COPY into“ unterstützten Einstellungen konfigurieren, die der Dienst beim Aufrufen der Anweisung durchläuft. | Ja |
Unter importSettings : |
||
type | Der Typ des Importbefehls, festgelegt auf SnowflakeImportCopyCommand. | Ja |
additionalCopyOptions | Zusätzliche Kopieroptionen, die als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Weitere Informationen finden Sie unter Snowflake Copy Options (Snowflake-Kopieroptionen). | Nein |
additionalFormatOptions | Zusätzliche Dateiformatoptionen, die im Befehl „COPY“ als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Weitere Informationen finden Sie unter Snowflake Format Type Options (Snowflake-Formattypoptionen). | Nein |
Hinweis
Stellen Sie sicher, dass Sie die Berechtigung haben, den folgenden Befehl auszuführen sowie auf das Schema INFORMATION_SCHEMA und die Tabelle COLUMNS zugreifen zu können.
SELECT CURRENT_REGION()
COPY INTO <table>
SHOW REGIONS
CREATE OR REPLACE STAGE
DROP STAGE
Direktes Kopieren in Snowflake
Wenn der Quelldatenspeicher und das Format die in diesem Abschnitt beschriebenen Kriterien erfüllen, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus der Quelle in Snowflake durchführen. Der Dienst überprüft die Einstellungen und gibt bei der Copy-Aktivitätsausführung einen Fehler aus, wenn die folgenden Kriterien nicht erfüllt werden:
Der verknüpfte Quelldienst ist Azure Blob Storage mit SAS-Authentifizierung. Wenn Sie Daten direkt im folgenden unterstützten Format aus Azure Data Lake Storage Gen2 kopieren möchten, können Sie einen verknüpften Azure Blob-Dienst mit SAS-Authentifizierung für Ihr ADLS Gen2-Konto erstellen. Damit vermeiden Sie gestaffelte Kopiervorgänge nach Snowflake.
Das Quelldatenformat lautet Parquet, Durch Trennzeichen getrennter Text oder JSON mit den folgenden Konfigurationen:
Beim Format Parquet wird einer der Komprimierungscodecs None oder Snappy verwendet.
Beim Format Durch Trennzeichen getrennter Text:
-
rowDelimiter
ist \r\n oder ein beliebiges einzelnes Zeichen. Wenn das Zeilentrennzeichen nicht „\r\n“ ist, mussfirstRowAsHeader
auf false festgelegt sein.skipLineCount
ist nicht angegeben. -
compression
kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein. -
encodingName
wird als Standardwert übernommen oder ist auf „UTF-8“, „UTF-16“, „UTF-16BE“, „UTF-32“, „UTF-32BE“, „BIG5“, „EUC-JP“, „EUC-KR“, „GB18030“, „ISO-2022-JP“, „ISO-2022-KR“, „ISO-8859-1“, „ISO-8859-2“, „ISO-8859-5“, „ISO-8859-6“, „ISO-8859-7“, „ISO-8859-8“, „ISO-8859-9“, „WINDOWS-1250“, „WINDOWS-1251“, „WINDOWS-1252“, „WINDOWS-1253“, „WINDOWS-1254“ oder „WINDOWS-1255“ festgelegt. -
quoteChar
ist als doppeltes Anführungszeichen, einfaches Anführungszeichen oder als leere Zeichenfolge (kein Anführungszeichen) festgelegt.
-
Für das JSON-Format unterstützt direktes Kopieren nur den Fall, dass die Snowflake-Senkentabelle nur eine einzelne Spalte hat und der Datentyp dieser Spalte VARIANT, OBJECT oder ARRAY ist.
-
compression
kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein. -
encodingName
wird als Standardwert übernommen oder ist auf utf-8 festgelegt. - Die Spaltenzuordnung ist nicht angegeben.
-
In der Quelle der Kopieraktivität:
-
additionalColumns
ist nicht angegeben. - Wenn es sich bei der Quelle um einen Ordner handelt, wird
recursive
auf TRUE festgelegt. -
prefix
,modifiedDateTimeStart
,modifiedDateTimeEnd
undenablePartitionDiscovery
wurden nicht angegeben.
-
Beispiel:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeSink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE"
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD"
}
}
}
}
}
]
Gestaffeltes Kopieren in Snowflake
Wenn der Quelldatenspeicher oder das Format nicht nativ mit dem Snowflake-Befehl „COPY“ kompatibel ist (wie im letzten Abschnitt beschrieben), aktivieren Sie mithilfe einer Azure Blob Storage-Zwischeninstanz das integrierte gestaffelte Kopieren. Das Feature für gestaffeltes Kopieren bietet Ihnen auch einen höheren Durchsatz. Der Dienst konvertiert die Daten automatisch, damit das Datenformat den Anforderungen von Snowflake entspricht. Dann wird der Befehl „COPY“ aufgerufen, um die Daten in Snowflake zu laden. Abschließend werden Sie die temporären Daten in Blob Storage bereinigt. Ausführliche Informationen zum Kopieren von Daten mithilfe von Staging finden Sie unter Gestaffeltes Kopieren.
Um diese Funktion verwenden zu können, erstellen Sie einen mit Azure Blob Storage verknüpften Dienst, der auf das Azure Storage-Konto als Stagingzwischenspeicher verweist. Geben Sie dann die Eigenschaften enableStaging
und stagingSettings
in der Kopieraktivität an.
Hinweis
Der mit Azure Blob Storage verknüpfte Stagingdienst muss die SAS-Authentifizierung (Shared Access Signature) verwenden, die für den Snowflake-Befehl „COPY“ erforderlich ist.
Beispiel:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeSink",
"importSettings": {
"type": "SnowflakeImportCopyCommand"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Eigenschaften von Mapping Data Flow
Beim Transformieren von Daten im Zuordnungsdatenfluss können Sie in Snowflake Tabellen lesen und in diese schreiben. Weitere Informationen finden Sie unter Quellentransformation und Senkentransformation in Zuordnungsdatenflüssen. Sie können ein Snowflake-Dataset oder Inlinedataset als Quell- und Senkentyp verwenden.
Quellentransformation
In der folgenden Tabelle sind die von einer Snowflake-Quelle unterstützten Eigenschaften aufgeführt. Sie können diese Eigenschaften auf der Registerkarte Quelloptionen bearbeiten. Der Connector verwendet die interne Datenübertragung von Snowflake.
Name | BESCHREIBUNG | Erforderlich | Zulässige Werte | Datenflussskript-Eigenschaft |
---|---|---|---|---|
Tabelle | Wenn Sie „Tabelle“ als Eingabe auswählen, ruft der Datenfluss alle Daten aus der Tabelle ab, die im Snowflake-Dataset oder bei Verwendung des Inlinedatasets in den Quelloptionen angegeben ist. | Nein | String |
(nur für Inlinedataset) tableName schemaName |
Abfrage | Wenn Sie „Abfrage“ als Eingabe auswählen, geben Sie eine Abfrage zum Abrufen von Daten aus Snowflake ein. Diese Einstellung überschreibt jede Tabelle, die Sie im Dataset ausgewählt haben. Wenn der Name des Schemas, der Tabelle und Spalten Kleinbuchstaben enthält, geben Sie den Objektbezeichner in der Abfrage an, z. B. select * from "schema"."myTable" . |
Nein | String | Abfrage |
Inkrementelle Extrahierung aktivieren (Vorschau) | Verwenden Sie diese Option, um ADF mitzuteilen, dass nur Zeilen verarbeitet werden sollen, die seit der letzten Ausführung der Pipeline geändert wurden. | Nein | Boolean | enableCdc |
Inkrementelle Spalte | Wenn Sie das Feature für das inkrementelle Extrahieren verwenden, müssen Sie die Datums-/Uhrzeit-/numerische Spalte auswählen, die Sie als Grenzwert in der Quelltabelle verwenden möchten. | Nein | String | waterMarkColumn |
Aktivieren der Snowflake-Änderungsnachverfolgung (Vorschau) | Mit dieser Option kann ADF die Snowflake-Change Data Capture-Technologie nutzen, um nur die Deltadaten seit der vorherigen Pipelineausführung zu verarbeiten. Mit dieser Option werden die Deltadaten automatisch mit Vorgängen für Zeileneinfügung, Zeilenaktualisierung und Zeilenlöschung geladen, ohne dass eine inkrementelle Spalte erforderlich ist. | Nein | Boolean | enableNativeCdc |
Netto-Änderungen | Wenn Sie die Snowflake-Änderungsnachverfolgung verwenden, können Sie diese Option verwenden, um deduplizierte geänderte Zeilen oder umfassende Änderungen abzurufen. Deduplizierte geänderte Zeilen zeigen nur die neuesten Versionen der Zeilen an, die seit einem bestimmten Zeitpunkt geändert wurden, während umfassende Änderungen alle Versionen jeder geänderten Zeile anzeigen, einschließlich der Zeilen, die gelöscht oder aktualisiert wurden. Wenn Sie z. B. eine Zeile aktualisieren, wird eine Löschversion und eine Einfügeversion in den umfassenden Änderungen angezeigt, aber nur die Einfügeversion in deduplizierten geänderten Zeilen. Je nach Anwendungsfall können Sie die Option auswählen, die Ihren Bedürfnissen entspricht. Die Standardoption ist FALSCH, was umfassende Änderungen bedeutet. | Nein | Boolean | netChanges |
Systemspalten einschließen | Bei Verwendung der Snowflake-Änderungsnachverfolgung können Sie mithilfe der Option „systemColumns“ steuern, ob die von Snowflake bereitgestellten Metadatenstromspalten in die Ausgabe der Änderungsnachverfolgung einbezogen oder ausgeschlossen werden. Standardmäßig ist „systemColumns“ auf WAHR festgelegt, was bedeutet, dass die Metadatenstromspalten eingeschlossen sind. Sie können „systemColumns“ auf FALSCH festlegen, wenn Sie diese ausschließen möchten. | Nein | Boolean | systemColumns |
Lesen von Anfang an beginnen | Wenn Sie diese Option mit dem inkrementellen Extrahieren und der Änderungsnachverfolgung festlegen, wird ADF angewiesen, alle Zeilen bei der ersten Ausführung einer Pipeline mit aktiviertem inkrementellem Extrahieren zu lesen. | Nein | Boolean | skipInitialLoad |
Beispiele für Snowflake-Quellskripts
Wenn Sie das Snowflake-Dataset als Quelltyp verwenden, sieht das zugehörige Datenflussskript wie folgt aus:
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
Wenn Sie ein Inlinedataset verwenden, sieht das zugehörige Datenflussskript wie folgt aus:
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
Native Änderungsnachverfolgung
Azure Data Factory unterstützt jetzt ein natives Feature in Snowflake, das als Änderungsnachverfolgung bekannt ist, was das Nachverfolgen von Änderungen in Form von Protokollen umfasst. Dieses Feature von Snowflake ermöglicht es uns, die Änderungen der Daten im Zeitverlauf zu verfolgen, was für das inkrementelle Laden von Daten und für Überprüfungszwecke nützlich ist. Wenn Sie dieses Feature verwenden möchten, erstellen wir beim Aktivieren von Change Data Capture und Auswählen der Snowflake-Änderungsnachverfolgung ein Stream-Objekt für die Quelltabelle, das die Änderungsnachverfolgung in der Snowflake-Tabelle ermöglicht. Anschließend verwenden wir die CHANGES-Klausel in unserer Abfrage, um nur die neuen oder aktualisierten Daten aus der Quelltabelle abzurufen. Außerdem wird empfohlen, die Pipeline so zu planen, dass Änderungen innerhalb des Intervalls der Datenaufbewahrungszeit für die Snowflake-Quelltabelle verarbeitet werden, sonst kann der Benutzer inkonsistentes Verhalten bei erfassten Änderungen sehen.
Senkentransformation
In der folgenden Tabelle sind die von einer Snowflake-Senke unterstützten Eigenschaften aufgeführt. Sie können diese Eigenschaften auf der Registerkarte Einstellungen bearbeiten. Bei Verwendung eines Inlinedatasets werden zusätzliche Einstellungen angezeigt. Diese entsprechen den Eigenschaften, die im Abschnitt zu den Dataseteigenschaften beschrieben sind. Der Connector verwendet die interne Datenübertragung von Snowflake.
Name | BESCHREIBUNG | Erforderlich | Zulässige Werte | Datenflussskript-Eigenschaft |
---|---|---|---|---|
Updatemethode | Geben Sie an, welche Vorgänge für das Snowflake-Ziel zulässig sind. Um Aktualisierungs-, Upsert- oder Löschaktionen auf Zeilen anzuwenden, muss eine Zeilenänderungstransformation zum Kennzeichnen von Zeilen für diese Aktionen erfolgen. |
Ja |
true oder false |
deletable insertable updateable upsertable |
Schlüsselspalten | Für Update-, Upsert- und Löschvorgänge muss mindestens eine Schlüsselspalte festgelegt werden, um die Zeile zu bestimmen, die geändert werden soll. | Nein | Array | keys |
Aktion table | Bestimmt, ob die Zieltabelle vor dem Schreiben neu erstellt werden soll oder alle Zeilen aus der Zieltabelle entfernt werden sollen. - Keine: Es wird keine Aktion an der Tabelle vorgenommen. - Neu erstellen: Die Tabelle wird gelöscht und neu erstellt. Erforderlich, wenn eine neue Tabelle dynamisch erstellt wird. - Abschneiden: Alle Zeilen werden aus der Zieltabelle entfernt. |
Nein |
true oder false |
Neu erstellen truncate |
Beispiele für Snowflake-Senkenskripts
Wenn Sie das Snowflake-Dataset als Senkentyp verwenden, sieht das zugehörige Datenflussskript wie folgt aus:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Wenn Sie ein Inlinedataset verwenden, sieht das zugehörige Datenflussskript wie folgt aus:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Abfrage-Pushdown-Optimierung
Durch Festlegen des Protokolliergrads der Pipeline auf „Keine“ schließen wir die Übertragung von Zwischenmetriken für die Transformation aus, wodurch potenzielle Hindernisse für Spark-Optimierungen vermieden und die von Snowflake bereitgestellte Abfrage-Pushdown-Optimierung ermöglicht werden. Diese Pushdown-Optimierung ermöglicht erhebliche Leistungsverbesserungen für große Snowflake-Tabellen mit umfangreichen Datasets.
Hinweis
Temporäre Tabellen in Snowflake werden nicht unterstützt, da sie lokal für die Sitzung oder den Benutzer sind, die/der sie erstellt, wodurch sie für andere Sitzungen unzugänglich sind und anfällig dafür macht, von Snowflake als reguläre Tabellen überschrieben zu werden. Während Snowflake transiente Tabellen als Alternative bietet, die global zugänglich sind, erfordern sie ein manuelles Löschen, was unserem primären Ziel der Verwendung von temporäre Tabellen widerspricht, das darin besteht, jegliche Löschvorgänge im Quellschema zu vermeiden.
Eigenschaften der Lookup-Aktivität
Weitere Informationen zu den Eigenschaften finden Sie unter Lookup-Aktivität in Azure Data Factory.
Zugehöriger Inhalt
Eine Liste der Datenspeicher, die als Quellen und Senken für die Copy-Aktivität unterstützt werden, finden Sie unter Unterstützte Datenspeicher und Formate.