跳到主要内容

3 篇博文 含有标签「elt」

查看所有标签

· 阅读需 11 分钟
弗里茨·拉科

云数据仓库

在过去几年中,我们看到云数据仓库(以及“仓库优先”范式)的使用快速增长。两个流行的云 DWH 平台是 BigQuery 和 Snowflake。查看下面的图表,了解它们随时间的演变。

图片:Gartner 来自 Twitter 上的 Adam Ronthal (@aronthal)。 图片:Gartner 来自 Twitter 上的 Adam Ronthal (@aronthal)。

BigQuery,截至 2021 年排名第 4,是由 Google Cloud Platform (GCP) 提供的完全托管的无服务器数据仓库服务。它可以对 PB 级数据进行简单且可扩展的分析,并且长期以来一直以其易用性和免维护特性而闻名。

Snowflake 是 Snowflake Inc. 公司提供的一项类似服务。主要区别之一是 Snowflake 允许您在 Amazon Web Services (AWS)、Azure (Microsoft) 或 GCP (Google) 中托管实例。如果您已经在非 GCP 环境中建立,这是一个很大的优势。

导出和加载数据

根据情况,有时需要或希望将数据从 BigQuery 环境复制到 Snowflake 环境中。让我们看一下并分解正确移动此数据所需的各种逻辑步骤,因为竞争服务都没有集成功能来轻松完成此操作。为了我们的示例,我们将假设我们的目标 Snowflake 环境托管在 AWS 上。

逐步程序

为了将数据从 BigQuery 迁移到 Snowflake (AWS),以下是基本步骤:

  1. 识别表或查询并执行 EXPORT DATA OPTIONS 查询以导出到 Google Cloud Storage (GCS)。

  2. 在VM或本地机器上运行脚本将GCS数据复制到Snowflake的Internal Stage。我们也可以通过存储集成直接从 GCS 读取数据,但这涉及另一层安全访问配置(这可能更适合您的用例)。

  3. 手动生成具有正确列数据类型的 CREATE TABLE DDL 并在 Snowflake 中执行。

  4. 在 Snowflake 中执行 COPY 查询以导入暂存文件。

  5. 可选择清理(删除)GCP 和内部阶段中的临时数据。

    图片:从 BigQuery 手动导出到 Snowflake 的步骤。

    如上所示,有几个步骤可以实现这一点,其中需要与独立系统进行交互。这对于自动化来说可能很麻烦,尤其是在目标系统中使用正确的列类型生成正确的 DDL(#3)(我个人认为这是最繁重的,请尝试对具有 50 多列的表执行此操作)。

    幸运的是,有一种更简单的方法可以做到这一点,那就是使用一个名为 Sling 的漂亮工具。 Sling 是一种数据集成工具,可以轻松高效地从/向数据库、存储平台和 SaaS 应用程序移动数据(提取和加载)。有两种使用方式:Sling CLI 和 Sling Cloud。我们将执行与上述相同的过程,但仅通过向 sling 提供输入,它会自动为我们执行复杂的步骤!

使用 Sling CLI

如果您是命令行狂热者,Sling CLI 适合您。它内置在 go 中(这使得它超级快),并且它适用于文件、数据库和各种 saas 端点。它还可以与 Unix Pipes 一起使用(读取标准输入并写入标准输出)。我们可以从我们的 shell 中快速安装它:

#  在 Mac 上
brew install slingdata-io/sling/sling

# 在 Windows Powershell 上
scoop bucket add org https://github.com/slingdata-io/scoop-sling.git
scoop install sling

# 通过 pip 使用 Python Wrapper
pip install sling

有关其他安装选项(包括 Linux),请参阅 此处。还有一个 Python 包装器 库,如果您更喜欢在 Python 中与 Sling 交互,它会很有用。

安装后,我们应该能够运行 sling 命令,它应该给我们这样的输出:

sling - An Extract-Load tool | https://slingdata.io/zh
Slings data from a data source to a data target.
Version 0.86.52

Usage:
sling [conns|run|update]

Subcommands:
conns Manage local connections
run Execute an ad-hoc task
update Update Sling to the latest version

Flags:
--version Displays the program version string.
-h --help Displays help with available flag, subcommand, and positional value parameters.

现在有很多方法可以配置任务,但是对于本文的范围,我们首先需要为 BigQuery 和 Snowflake 添加连接凭据(一次性的家务)。我们可以通过打开文件 ~/.sling/env.yaml 并添加凭据来做到这一点,它应该如下所示:

~/.sling/env.yaml
connections:

BIGQUERY:
type: bigquery
project: sling-project-123
location: US
dataset: public
gc_key_file: ~/.sling/sling-project-123-ce219ceaef9512.json
gc_bucket: sling_us_bucket # this is optional but recommended for bulk export.

SNOWFLAKE:
type: snowflake
username: fritz
password: my_pass23
account: abc123456.us-east-1
database: sling
schema: public

太好了,现在让我们测试我们的连接:

$ sling conns list
+------------+------------------+-----------------+
| CONN NAME | CONN TYPE | SOURCE |
+------------+------------------+-----------------+
| BIGQUERY | DB - Snowflake | sling env yaml |
| SNOWFLAKE | DB - PostgreSQL | sling env yaml |
+------------+------------------+-----------------+

$ sling conns test BIGQUERY
6:42PM INF success!

$ sling conns test SNOWFLAKE
6:42PM INF success!

太棒了,现在我们已经建立了连接,我们可以运行我们的任务:

$ sling run --src-conn BIGQUERY --src-stream "select user.name, activity.* from public.activity join public.user on user.id = activity.user_id where user.type != 'external'" --tgt-conn SNOWFLAKE --tgt-object 'public.activity_user' --mode full-refresh
11:37AM INF connecting to source database (bigquery)
11:37AM INF connecting to target database (snowflake)
11:37AM INF reading from source database
11:37AM INF writing to target database [mode: full-refresh]
11:37AM INF streaming data
11:37AM INF dropped table public.activity_user
11:38AM INF created table public.activity_user
11:38AM INF inserted 77668 rows
11:38AM INF execution succeeded

哇,这很容易! Sling 自动完成了我们之前描述的所有步骤。我们甚至可以将 Snowflake 数据导出回我们的 shell sdtout(以 CSV 格式),只需为 public.activity_user 标志提供表标识符 (--src-stream) 并计算行数以验证我们的数据:

$ sling run --src-conn SNOWFLAKE --src-stream public.activity_user --stdout | wc -l
11:39AM INF connecting to source database (snowflake)
11:39AM INF reading from source database
11:39AM INF writing to target stream (stdout)
11:39AM INF wrote 77668 rows
11:39AM INF execution succeeded
77669 # CSV output includes a header row (77668 + 1)

使用 Sling Cloud

现在让我们对 Sling Cloud 应用程序做同样的事情。 Sling Cloud 使用与 Sling CLI 相同的引擎,除了它是一个完全托管的平台,可以以有竞争力的价格运行您的所有 Extract-Load 需求(查看我们的 [定价页面]https://slingdata.io/zh/价钱))。使用 Sling Cloud,我们可以:

  • 与许多团队成员合作

  • 管理多个工作区/项目

  • 安排提取加载 (EL) 任务以间隔或固定时间 (CRON) 运行

  • 收集和分析日志以进行调试

  • 通过电子邮件或 Slack 的错误通知

  • 如果需要,可以从全球区域或自托管模式运行(其中 Sling Cloud 是协调器)。

  • 用于快速设置和执行的直观用户界面 (UI)

    首先是注册一个免费帐户 这里。登录后,我们可以选择 Cloud Mode(稍后将详细介绍 Self-Hosted 模式)。现在我们可以执行与上述类似的步骤,但使用 Sling Cloud UI:

添加 BigQuery 连接

步骤截图

转到 Connections,单击 New Connection,选择 Big Query

![bigquery-connection-step-1](2022-10-11-09-42-19.png)

输入姓名 BIGQUERY、您的凭据,然后上传您的 google 帐户 JSON 文件。点击 Test 测试连接性,然后点击 Save

bigquery-connection-step-2

添加雪花连接

步骤截图

单击 New Connection,选择 Snowflake

雪花连接步骤 1

输入名称 SNOWFLAKE 和您的凭据。点击 Test 测试连接性,然后点击 Save

雪花连接步骤 2

创建复制

步骤截图

转到 Replications,单击 New Replication,选择 Big Query 作为源,选择 Snowflake 作为目标。点击 Next

复制连接步骤 1

如果需要,调整为 Target Object Name Pattern,然后单击 Create

![复制-连接-步骤-2](2022-10-11-10-02-32.png)

创建并运行任务

步骤截图

转到 Streams 选项卡,单击 New SQL Stream,因为我们使用自定义 SQL 作为源数据。给它一个名字 (activity_user)。粘贴 SQL 查询,点击 Ok

![创建任务步骤 1](2022-10-11-17-36-05.png)

![创建任务步骤 2](2022-10-11-17-44-22.png)

现在我们已经准备好一个流任务,我们可以点击播放图标来触发它。

![运行任务步骤 1](2022-10-11-17-46-22.png)

一旦任务执行完成,我们可以检查日志。

![运行任务步骤 2](2022-10-11-17-48-56.png)

就是这样!我们有我们的任务设置,我们可以按需重新运行它或按计划设置它。您可能会注意到,与 Sling CLI 相比,Sling Cloud 为我们处理了更多的事情,并为面向 UI 的用户提供了更好的体验。

结论

我们处在一个数据就是黄金的时代,将数据从一个平台转移到另一个平台应该不难。正如我们所展示的,Sling 通过减少与数据集成相关的摩擦提供了一种强大的替代方案。我们将在另一篇文章中介绍如何从 Snowflake 导出并加载到 BigQuery。

· 阅读需 8 分钟
弗里茨·拉科

背景

Sling CLI 简介

Sling CLI 是一个命令行工具,它允许从/向数据库、存储平台和 SaaS 应用程序轻松高效地移动数据(提取和加载)。入门很简单,如果您安装了 Python 的 pip,您只需运行 pip install sling。或者,您可以 这里 为您的机器下载二进制文件。

连接凭据

为了使用 sling,我们必须首先配置连接凭据,Sling CLI 会在各个地方查找它们。如果您已经在使用 dbt 等其他工具,或者在环境变量中设置了连接 U RL,则这允许“即插即用”性质。但是,建议使用 Sling 的 env.yaml 文件,因为它可以提供更加一致和灵活的体验。

Sling 环境文件

第一次运行 sling 命令时,会在当前用户的主目录 (.sling) 中创建 ~/.sling 文件夹,该目录中又包含一个名为 env.yaml 的文件。 Sling 的 Env 文件的结构很简单,您将连接的凭据放在 connections 键下,如下所示:

connections:
marketing_pg:
url: 'postgres://...'
ssh_tunnel: 'ssh://...' # optional

# 或 dbt 配置文件样式
marketing_pg:
type: postgres
host: [hostname]
user: [username]
password: [password]
port: [port]
dbname: [database name]
schema: [dbt schema]
ssh_tunnel: 'ssh://...'

finance_bq:
type: bigquery
method: service-account
project: [GCP project id]
dataset: [the name of your dbt dataset]
keyfile: [/path/to/bigquery/keyfile.json]

# 全局变量,在运行时可用于所有连接(可选)
variables:
aws_access_key: '...'
aws_secret_key: '...'

请参阅 此处 了解所有接受的连接类型及其各自所需的数据点。

sling conns list 命令与 Sling Env 凭据一起使用时,SOURCE 列将显示为 sling env yaml

环境变量

如果你更喜欢使用环境变量,在你的 shell 环境中设置它们就足够了:

# Mac / Linux
export MY_PG='postgresql://user:mypassw@pg.host:5432/db1'
export MY_SNOWFLAKE='snowflake://user:mypassw@sf.host/db1'
export ORACLE_DB='oracle://user:mypassw@orcl.host:1521/db1'

# Windows Powershell
set MY_PG 'postgresql://user:mypassw@pg.host:5432/db1'
set MY_SNOWFLAKE 'snowflake://user:mypassw@sf.host/db1'
set ORACLE_DB 'oracle://user:mypassw@orcl.host:1521/db1'

sling conns list 命令与环境变量一起使用时,SOURCE 列将显示为 env variable

DBT 配置文件

dbt 是许多数据专业人员日常使用的另一种流行工具,支持现有的本地配置文件可以轻松交叉使用。 dbt 凭据的典型位置位于 ~/dbt/profiles.yml 文件中。有关详细信息,请参阅 此处

如果您拥有 dbt 凭据并使用 sling conns list 命令,SOURCE 列将显示为 dbt profiles yaml

conns 子命令

现在您已经设置了凭据,sling 提供了一个 conns 子命令来与连接进行交互。我们可以执行以下操作:listtestdiscover

$ sling conns -h
conns - Manage local connections

Usage:
conns [discover|list|test]

Subcommands:
discover list available streams in connection
list list local connections detected
test test a local connection

Flags:
--version Displays the program version string.
-h --help Displays help with available flag, subcommand, and positional value parameters.

列出连接

查看和列出我们环境中可用的所有连接很方便。我们可以简单地运行 sling conns list 命令。这是一个例子:

$ sling conns list
+----------------------+------------------+-------------------+
| CONN NAME | CONN TYPE | SOURCE |
+----------------------+------------------+-------------------+
| AWS_S3 | FileSys - S3 | sling env yaml |
| AZURE_STORAGE | FileSys - Azure | sling env yaml |
| BIGQUERY | DB - BigQuery | sling env yaml |
| BIONIC_DB1 | DB - PostgreSQL | dbt profiles yaml |
| BTD_S3 | FileSys - S3 | sling env yaml |
| CLICKHOUSE | DB - Clickhouse | sling env yaml |
| DEMO_POSTGRES | DB - PostgreSQL | sling env yaml |
| GITHUB_DBIO | API - GitHub | sling env yaml |
| NOTION | API - Notion | sling env yaml |
| SNOWFLAKE | DB - Snowflake | env variable |
| STEAMPIPE | DB - PostgreSQL | sling env yaml |
+----------------------+------------------+-------------------+

测试连接

Sling CLI 工具还允许测试连接。一旦我们知道连接名称,我们就可以使用 sling conns test 命令:

$ sling conns test -h
test - test a local connection

Usage:
test [name]

Positional Variables:
name The name of the connection to test (Required)
Flags:
--version Displays the program version string.
-h --help Displays help with available flag, subcommand, and positional value parameters.

这是一个实际的例子:

$ sling conns test MSSQL
6:42PM INF success!

发现连接流

这是另一个漂亮的子命令,它允许人们查看从特定连接读取哪些数据流可用于 slingsling conns discover 命令。

$ sling conns discover -h
discover - list available streams in connection

Usage:
discover [name]

Positional Variables:
name The name of the connection to test (Required)
Flags:
--version Displays the program version string.
-h --help Displays help with available flag, subcommand, and positional value parameters.
-f --filter filter stream name by pattern (e.g. account_*)
--folder discover streams in a specific folder (for file connections)
--schema discover streams in a specific schema (for database connections)

对于数据库连接,它将列出可用的表和视图。对于存储连接,它将列出位于指定源文件夹中的非递归文件对象。对于 SaaS/API 连接,它将列出所有可供消费的可用对象。下面是一些例子。

数据库连接

$ sling conns discover CLICKHOUSE
6:57PM INF Found 68 streams:
- "default"."docker_logs"
- "default"."sling_docker_logs"
- "system"."aggregate_function_combinators"
- "system"."asynchronous_metric_log"
- "system"."asynchronous_metrics"
- "system"."build_options"
- "system"."clusters"
....

如果我们想过滤特定的 shema,我们可以这样做:

$ sling conns discover CLICKHOUSE --schema default
8:29PM INF Found 2 streams:
- "default"."docker_logs"
- "default"."sling_docker_logs"

SaaS 连接

$ sling conns discover NOTION
6:58PM INF Found 4 streams:
- blocks
- databases
- pages
- users

存储连接

$ sling conns discover AWS_S3
6:52PM INF Found 7 streams:
- s3://my-sling-bucket/logging/
- s3://my-sling-bucket/part.01.0001.csv
- s3://my-sling-bucket/sling/
- s3://my-sling-bucket/temp/
- s3://my-sling-bucket/test.fs.write/
- s3://my-sling-bucket/test/
- s3://my-sling-bucket/test_1000.csv

如果我们想查看子文件夹中的文件,我们可以这样做:

$ sling conns discover AWS_S3 --folder s3://my-sling-bucket/logging/
6:55PM INF Found 1 streams:
- s3://my-sling-bucket/logging/1/1.log.gz

正在运行 EL 任务

现在您的连接已设置好,您可以运行一些提取和加载任务了!我们在另一篇文章中对此进行了详细介绍,您可以在 此处 阅读相关内容。从命令行,您还可以运行 sling run -e,这将打印一堆示例。

结论

综合考虑,Sling CLI 使您可以轻松地从您的 shell 管理各种类型的连接并与之交互。如果您有任何问题或建议,请随时与我们联系 这里。此外,请务必查看 Sling Cloud,这是一项以具有竞争力的价格模式处理您的各种提取和加载需求的云服务。

· 阅读需 1 分钟
弗里茨·拉科

背景

Sling 是一款功能强大的现代数据集成工具,可以轻松高效地从流行的数据源中提取数据并将数据加载到目的地。

为什么是Sling?

  • 超快的性能 - 核心引擎用 Go 编写并采用流式设计,通过在内存中保存最少的数据使其超级高效。
  • 快速复制数据 - 轻松地将数据从源数据库、文件或 SaaS 连接复制到目标数据库或文件。
  • 透明且低成本 - Sling 在高效且低成本的模型上运行。我们的目标是对使用我们平台的成本保持透明。

详细了解 Sling 的工作原理 注册Sling