共计 8492 个字符,预计需要花费 22 分钟才能阅读完成。
在本文中,我们将了解如何通过连接 Apache Kafka 代理到 MongoDB Atlas 并使用 CDC 解决方案将数据从 MongoDB Atlas 流式传输到 SingleStore Kai。同时,我们还将使用 Metabase 创建一个简单的 SingleStore Kai 分析仪表板。
本文末尾附上所使用的代码本文的出处链接。
https://github.com/VeryFatBoy/adtech-kafka-cdc
简介
CDC 是一种追踪数据库或系统中发生的更改的方法。SingleStore 现在提供与 MongoDB 配合使用的 CDC 解决方案。
为了演示 CDC 解决方案,我们将使用 Kafka 代理将数据流式传输到 MongoDB Atlas 集群,并使用 CDC 管道将数据从 MongoDB Atlas 传播到 SingleStore Kai。我们还将使用 Metabase 创建一个简单的分析仪表板。
下图显示了系统的高级架构。
以后会发布重点介绍使用 CDC 解决方案的其他场景的文章。
MongoDB Atlas
我们将在 M0 Sandbox 中使用 MongoDB Atlas。在 Database Access 下,我们将配置一个具有 atlasAdmin 权限的管理员用户。在 Network Access 下,我们将临时允许从任何地方(IP 地址 0.0.0.0/0)访问。我们将记录用户名、密码和主机。
Apache Kafka
我们将配置一个 Kafka 代理将数据流式传输到 MongoDB Atlas。我们将使用 Jupyter Notebook 来实现这一目标。
首先,我们需要安装一些库:
!pip install pymongo kafka-python --quiet
接下来,我们将连接到 MongoDB Atlas 和 Kafka 代理:
from kafka import KafkaConsumer
from pymongo import MongoClient
try:
client = MongoClient("mongodb+srv://:@/?retryWrites=true&w=majority")
db = client.adtech
print("连接成功")
except:
print("无法连接")
consumer = KafkaConsumer(
"ad_events",
bootstrap_servers = ["public-kafka.memcompute.com:9092"]
)
我们将使用之前从 MongoDB Atlas 保存的值替换 `
首先,我们将加载 100 条记录到 MongoDB Atlas 中:
MAX_ITERATIONS = 100
for iteration, message in enumerate(consumer, start = 1):
if iteration > MAX_ITERATIONS:
break
try:
record = message.value.decode("utf-8")
user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("t"))
events_record = {"user_id": int(user_id),
"event_name": event_name,
"advertiser": advertiser,
"campaign": int(campaign.split()[0]),
"gender": gender,
"income": income,
"page_url": page_url,
"region": region,
"country": country
}
db.events.insert_one(events_record)
except Exception as e:
print(f"Iteration {iteration}: 无法插入数据 - {str(e)}")
数据应成功加载,并且我们应该看到一个名为 `adtech` 的数据库和一个名为 `events` 的集合。集合中的文档应具有类似以下示例的结构:
_id: ObjectId('64ec906d0e8c0f7bcf72a8ed')
user_id: 3857963415
event_name: "Impression"
advertiser: "Sherwin-Williams"
campaign: 13
gender: "Female"
income: "25k and below",
page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/"
region: "Michigan"
country: "US"
这些文档表示广告活动事件。events 集合存储了有关广告商、广告活动以及用户的各种人口统计信息,如性别和收入。
SingleStore Kai
SingleStore Kai 是一个实时分析平台,可以处理大规模的数据和查询。我们将使用 CDC pipeline 将数据从 MongoDB Atlas 传播到 SingleStore Kai。
之前的文章展示了创建免费 SingleStoreDB Cloud 帐户的步骤。我们将使用以下设置:
工作区组名称:CDC Demo Group
云提供商:AWS
地区:US East 1 (N. Virginia)
工作区名称:cdc-demo
大小:S-00
设置:
-
选择 SingleStore Kai 一旦工作区可用,我们会记下密码和主机信息。从 CDC Demo Group > Overview > Workspaces > cdc-demo > Connect > Connect Directly > SQL IDE > Host 可以找到主机信息。我们稍后在 Metabase 中会需要这些信息。
-
我们还会通过配置 CDC Demo Group > Firewall 临时允许从任何地方访问。
从左侧导航栏中,我们会选择 DEVELOP > SQL Editor 来创建一个 adtech 数据库和链接,如下所示:
CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;
DROP LINK adtech.link;
CREATE LINK adtech.link AS MONGODB
CONFIG '{"mongodb.hosts":":27017, :27017, :27017","collection.include.list":"adtech.*","mongodb.ssl.enabled":"true","mongodb.authsource":"admin","mongodb.members.auto.discover":"false"}'
CREDENTIALS '{"mongodb.user":"","mongodb.password":""}';
CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;
我们会用之前从 MongoDB Atlas 保存的值替换
现在我们来检查是否有任何表,如下所示:
SHOW TABLES;
这应该会显示一个名为 events 的表:
+------------------+
| Tables_in_adtech |
+------------------+
| events |
+------------------+
我们来检查表的结构:
DESCRIBE events;
输出应该如下所示:
+-------+------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------+------+------+---------+-------+
| _id | text | NO | UNI | NULL | |
| _more | JSON | NO | | NULL | |
+-------+------+------+------+---------+-------+
接下来,我们来检查是否有任何 pipelines:
SHOW PIPELINES;
这将显示一个名为 events 的 pipeline,目前处于停止状态:
+---------------------+---------+-----------+
| Pipelines_in_adtech | State | Scheduled |
+---------------------+---------+-----------+
| events | Stopped | False |
+---------------------+---------+-----------+
现在我们来启动 events pipeline:
START ALL PIPELINES;
状态应该变为 Running:
+---------------------+---------+-----------+
| Pipelines_in_adtech | State | Scheduled |
+---------------------+---------+-----------+
| events | Running | False |
+---------------------+---------+-----------+
如果我们现在运行以下命令:
SELECT COUNT(*) FROM events;
它应该返回结果 100:
+----------+
| COUNT(*) |
+----------+
| 100 |
+----------+
我们检查 events 表中的一行,如下所示:
SELECT * FROM events LIMIT 1;
输出应类似于以下内容:
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| _id | _more |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
成功连接到 MongoDB Atlas 并将所有 100 条记录复制到 SingleStore Kai 的 CDC 解决方案。
现在,让我们使用 Metabase 创建一个仪表板。
Metabase
在之前的文章中,我们详细介绍了安装、配置和创建与 Metabase 的连接的方法。我们将使用稍微变化的查询来创建可视化。
1. 事件总数 SQL
SELECT COUNT(*) FROM events;
2. 按地区统计事件 SQL
SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;
3. 前 5 个广告商的事件数量 SQL
SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;
4. 广告访客按性别和收入分类 SQL
SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
WHEN xx.z___min_rank = xx.z___rank THEN 1
ELSE 0
END AS z__is_highest_ranked_cell
FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
FROM (SELECT *, RANK() OVER (ORDER BY CASE
WHEN bb.z__pivot_col_rank = 1 THEN (CASE
WHEN bb.`events.count` IS NOT NULL THEN 0
ELSE 1
END)
ELSE 2
END, CASE
WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
ELSE NULL
END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
WHEN ww.`events.gender` IS NULL THEN 1
ELSE 0
END, ww.`events.gender`) AS z__pivot_col_rank
FROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::income 'unknown' OR _more::income IS NULL)
GROUP BY 1, 2) ww) bb
WHERE bb.z__pivot_col_rank 下图显示了在 AdTech 仪表板上调整大小和位置的图表示例。
我们将设置自动刷新选项为 1 分钟。文章来源:https://www.toymoban.com/diary/system/666.html
如果我们在 MongoDB Atlas 中使用 Jupyter 笔记本加载更多数据,只需更改 MAX_ITERATIONS,我们将看到数据传播到 SingleStore Kai,并在 AdTech 仪表板中反映出新的数据。文章来源地址 https://www.toymoban.com/diary/system/666.html
到此这篇关于使用 Kafka 和 CDC 将数据从 MongoDB Atlas 流式传输到 SingleStore Kai 的文章就介绍到这了, 更多相关内容可以在右上角搜索或继续浏览下面的相关文章,希望大家以后多多支持 TOY 模板网!