跳到主要内容

将数据从 BigQuery 导出到雪花:简单的方法

· 阅读需 11 分钟
弗里茨·拉科

云数据仓库

在过去几年中,我们看到云数据仓库(以及“仓库优先”范式)的使用快速增长。两个流行的云 DWH 平台是 BigQuery 和 Snowflake。查看下面的图表,了解它们随时间的演变。

图片:Gartner 来自 Twitter 上的 Adam Ronthal (@aronthal)。 图片:Gartner 来自 Twitter 上的 Adam Ronthal (@aronthal)。

BigQuery,截至 2021 年排名第 4,是由 Google Cloud Platform (GCP) 提供的完全托管的无服务器数据仓库服务。它可以对 PB 级数据进行简单且可扩展的分析,并且长期以来一直以其易用性和免维护特性而闻名。

Snowflake 是 Snowflake Inc. 公司提供的一项类似服务。主要区别之一是 Snowflake 允许您在 Amazon Web Services (AWS)、Azure (Microsoft) 或 GCP (Google) 中托管实例。如果您已经在非 GCP 环境中建立,这是一个很大的优势。

导出和加载数据

根据情况,有时需要或希望将数据从 BigQuery 环境复制到 Snowflake 环境中。让我们看一下并分解正确移动此数据所需的各种逻辑步骤,因为竞争服务都没有集成功能来轻松完成此操作。为了我们的示例,我们将假设我们的目标 Snowflake 环境托管在 AWS 上。

逐步程序

为了将数据从 BigQuery 迁移到 Snowflake (AWS),以下是基本步骤:

  1. 识别表或查询并执行 EXPORT DATA OPTIONS 查询以导出到 Google Cloud Storage (GCS)。

  2. 在VM或本地机器上运行脚本将GCS数据复制到Snowflake的Internal Stage。我们也可以通过存储集成直接从 GCS 读取数据,但这涉及另一层安全访问配置(这可能更适合您的用例)。

  3. 手动生成具有正确列数据类型的 CREATE TABLE DDL 并在 Snowflake 中执行。

  4. 在 Snowflake 中执行 COPY 查询以导入暂存文件。

  5. 可选择清理(删除)GCP 和内部阶段中的临时数据。

    图片:从 BigQuery 手动导出到 Snowflake 的步骤。

    如上所示,有几个步骤可以实现这一点,其中需要与独立系统进行交互。这对于自动化来说可能很麻烦,尤其是在目标系统中使用正确的列类型生成正确的 DDL(#3)(我个人认为这是最繁重的,请尝试对具有 50 多列的表执行此操作)。

    幸运的是,有一种更简单的方法可以做到这一点,那就是使用一个名为 Sling 的漂亮工具。 Sling 是一种数据集成工具,可以轻松高效地从/向数据库、存储平台和 SaaS 应用程序移动数据(提取和加载)。有两种使用方式:Sling CLI 和 Sling Cloud。我们将执行与上述相同的过程,但仅通过向 sling 提供输入,它会自动为我们执行复杂的步骤!

使用 Sling CLI

如果您是命令行狂热者,Sling CLI 适合您。它内置在 go 中(这使得它超级快),并且它适用于文件、数据库和各种 saas 端点。它还可以与 Unix Pipes 一起使用(读取标准输入并写入标准输出)。我们可以从我们的 shell 中快速安装它:

#  在 Mac 上
brew install slingdata-io/sling/sling

# 在 Windows Powershell 上
scoop bucket add org https://github.com/slingdata-io/scoop-sling.git
scoop install sling

# 通过 pip 使用 Python Wrapper
pip install sling

有关其他安装选项(包括 Linux),请参阅 此处。还有一个 Python 包装器 库,如果您更喜欢在 Python 中与 Sling 交互,它会很有用。

安装后,我们应该能够运行 sling 命令,它应该给我们这样的输出:

sling - An Extract-Load tool | https://slingdata.io/zh
Slings data from a data source to a data target.
Version 0.86.52

Usage:
sling [conns|run|update]

Subcommands:
conns Manage local connections
run Execute an ad-hoc task
update Update Sling to the latest version

Flags:
--version Displays the program version string.
-h --help Displays help with available flag, subcommand, and positional value parameters.

现在有很多方法可以配置任务,但是对于本文的范围,我们首先需要为 BigQuery 和 Snowflake 添加连接凭据(一次性的家务)。我们可以通过打开文件 ~/.sling/env.yaml 并添加凭据来做到这一点,它应该如下所示:

~/.sling/env.yaml
connections:

BIGQUERY:
type: bigquery
project: sling-project-123
location: US
dataset: public
gc_key_file: ~/.sling/sling-project-123-ce219ceaef9512.json
gc_bucket: sling_us_bucket # this is optional but recommended for bulk export.

SNOWFLAKE:
type: snowflake
username: fritz
password: my_pass23
account: abc123456.us-east-1
database: sling
schema: public

太好了,现在让我们测试我们的连接:

$ sling conns list
+------------+------------------+-----------------+
| CONN NAME | CONN TYPE | SOURCE |
+------------+------------------+-----------------+
| BIGQUERY | DB - Snowflake | sling env yaml |
| SNOWFLAKE | DB - PostgreSQL | sling env yaml |
+------------+------------------+-----------------+

$ sling conns test BIGQUERY
6:42PM INF success!

$ sling conns test SNOWFLAKE
6:42PM INF success!

太棒了,现在我们已经建立了连接,我们可以运行我们的任务:

$ sling run --src-conn BIGQUERY --src-stream "select user.name, activity.* from public.activity join public.user on user.id = activity.user_id where user.type != 'external'" --tgt-conn SNOWFLAKE --tgt-object 'public.activity_user' --mode full-refresh
11:37AM INF connecting to source database (bigquery)
11:37AM INF connecting to target database (snowflake)
11:37AM INF reading from source database
11:37AM INF writing to target database [mode: full-refresh]
11:37AM INF streaming data
11:37AM INF dropped table public.activity_user
11:38AM INF created table public.activity_user
11:38AM INF inserted 77668 rows
11:38AM INF execution succeeded

哇,这很容易! Sling 自动完成了我们之前描述的所有步骤。我们甚至可以将 Snowflake 数据导出回我们的 shell sdtout(以 CSV 格式),只需为 public.activity_user 标志提供表标识符 (--src-stream) 并计算行数以验证我们的数据:

$ sling run --src-conn SNOWFLAKE --src-stream public.activity_user --stdout | wc -l
11:39AM INF connecting to source database (snowflake)
11:39AM INF reading from source database
11:39AM INF writing to target stream (stdout)
11:39AM INF wrote 77668 rows
11:39AM INF execution succeeded
77669 # CSV output includes a header row (77668 + 1)

使用 Sling Cloud

现在让我们对 Sling Cloud 应用程序做同样的事情。 Sling Cloud 使用与 Sling CLI 相同的引擎,除了它是一个完全托管的平台,可以以有竞争力的价格运行您的所有 Extract-Load 需求(查看我们的 [定价页面]https://slingdata.io/zh/价钱))。使用 Sling Cloud,我们可以:

  • 与许多团队成员合作

  • 管理多个工作区/项目

  • 安排提取加载 (EL) 任务以间隔或固定时间 (CRON) 运行

  • 收集和分析日志以进行调试

  • 通过电子邮件或 Slack 的错误通知

  • 如果需要,可以从全球区域或自托管模式运行(其中 Sling Cloud 是协调器)。

  • 用于快速设置和执行的直观用户界面 (UI)

    首先是注册一个免费帐户 这里。登录后,我们可以选择 Cloud Mode(稍后将详细介绍 Self-Hosted 模式)。现在我们可以执行与上述类似的步骤,但使用 Sling Cloud UI:

添加 BigQuery 连接

步骤截图

转到 Connections,单击 New Connection,选择 Big Query

![bigquery-connection-step-1](2022-10-11-09-42-19.png)

输入姓名 BIGQUERY、您的凭据,然后上传您的 google 帐户 JSON 文件。点击 Test 测试连接性,然后点击 Save

bigquery-connection-step-2

添加雪花连接

步骤截图

单击 New Connection,选择 Snowflake

雪花连接步骤 1

输入名称 SNOWFLAKE 和您的凭据。点击 Test 测试连接性,然后点击 Save

雪花连接步骤 2

创建复制

步骤截图

转到 Replications,单击 New Replication,选择 Big Query 作为源,选择 Snowflake 作为目标。点击 Next

复制连接步骤 1

如果需要,调整为 Target Object Name Pattern,然后单击 Create

![复制-连接-步骤-2](2022-10-11-10-02-32.png)

创建并运行任务

步骤截图

转到 Streams 选项卡,单击 New SQL Stream,因为我们使用自定义 SQL 作为源数据。给它一个名字 (activity_user)。粘贴 SQL 查询,点击 Ok

![创建任务步骤 1](2022-10-11-17-36-05.png)

![创建任务步骤 2](2022-10-11-17-44-22.png)

现在我们已经准备好一个流任务,我们可以点击播放图标来触发它。

![运行任务步骤 1](2022-10-11-17-46-22.png)

一旦任务执行完成,我们可以检查日志。

![运行任务步骤 2](2022-10-11-17-48-56.png)

就是这样!我们有我们的任务设置,我们可以按需重新运行它或按计划设置它。您可能会注意到,与 Sling CLI 相比,Sling Cloud 为我们处理了更多的事情,并为面向 UI 的用户提供了更好的体验。

结论

我们处在一个数据就是黄金的时代,将数据从一个平台转移到另一个平台应该不难。正如我们所展示的,Sling 通过减少与数据集成相关的摩擦提供了一种强大的替代方案。我们将在另一篇文章中介绍如何从 Snowflake 导出并加载到 BigQuery。