技能 data-engineering-data-pipeline
📦

data-engineering-data-pipeline

低风险

构建可扩展的数据管道

设计生产级数据管道复杂且容易出错。本技能提供经过验证的架构模式以及ETL、流处理和湖仓系统的实施指导。

支持: Claude Codex Code(CC)
📊 71 充足
1

下载技能 ZIP

2

在 Claude 中上传

前往 设置 → 功能 → 技能 → 上传技能

3

开启并开始使用

测试它

正在使用“data-engineering-data-pipeline”。 Design a batch pipeline for daily customer data sync from MySQL to Snowflake

预期结果:

Architecture: ELT pattern with incremental loading. Components: (1) Extract using watermark column 'updated_at', (2) Load raw data to S3 staging, (3) Transform in Snowflake with dbt, (4) Validate with dbt tests, (5) Alert on failures via Slack. Key considerations: Handle late-arriving data, implement retry logic, monitor row count variance.

正在使用“data-engineering-data-pipeline”。 How do I handle schema evolution in a streaming pipeline?

预期结果:

Strategy: Use schema registry with compatibility checks. For additive changes, use default values. For breaking changes, implement dual-write during migration. Tools: Confluent Schema Registry for Kafka, Delta Lake schema evolution with mergeSchema option. Always test backward compatibility before deployment.

安全审计

低风险
v1 • 2/24/2026

All static analyzer findings are false positives. The skill is documentation-only, providing architectural guidance and educational code examples. No executable code, external commands, or security risks detected. Safe for publication.

1
已扫描文件
204
分析行数
3
发现项
1
审计总数
低风险问题 (3)
Static Analyzer False Positives - Weak Cryptographic Algorithm
Static analyzer flagged lines 3, 28, 39, 42, 94, and 167 as containing weak cryptographic algorithms. Review confirms these are false positives - the flagged lines contain architectural terms (ETL/ELT, Lambda, Kappa) and documentation headers, not cryptographic code.
Static Analyzer False Positive - External Command Execution
Static analyzer flagged line 124 as Ruby/shell backtick execution. Review confirms this is a Python code example showing batch ingestion patterns, not shell command execution.
Static Analyzer False Positives - Reconnaissance Patterns
Static analyzer flagged lines 49, 116, and 184 as system/network reconnaissance. Review confirms these are data pipeline terminology (metadata tracking fields, partitioning strategies, monitoring alerts), not reconnaissance activity.
审计者: claude

质量评分

38
架构
100
可维护性
87
内容
50
社区
84
安全
91
规范符合性

你能构建什么

新建管道架构

为一家从电子表格迁移到现代数据技术栈的初创公司从头设计完整的数据管道。

流式迁移策略

使用Kafka和流处理框架将现有批量管道转换为实时流式架构。

数据质量框架实施

使用Great Expectations和dbt测试实施全面的数据质量检查,并配置自动告警。

试试这些提示

基础管道设计
我需要构建一个数据管道,从PostgreSQL每日提取数据,进行转换,然后加载到数据仓库。我应该使用什么架构?关键组件有哪些?
流式架构选型
我们拥有应用产生的高容量事件数据,需要近实时分析。请为我们的用例(每分钟100万事件)比较Lambda与Kappa架构。
数据质量实施
向我展示如何使用Great Expectations为我们的订单表实施数据质量检查。我们需要验证订单ID的唯一性、客户ID的非空性以及订单金额为正数。
成本优化审查
我们每月的数据管道成本翻倍了。请审查我们的架构,并提供在保持SLA的前提下降低成本的具体建议。当前技术栈:Airflow、Spark、S3、Redshift。

最佳实践

  • 在选择架构模式之前,评估数据源、数据量、延迟要求和目标系统
  • 使用水印列实施增量处理,避免重新处理整个数据集
  • 在每个管道阶段添加数据质量关卡,并在验证失败时自动告警

避免

  • 不根据特定数据量和速度需求进行调整就直接复制生产模式
  • 根据趋势而非业务需求和团队能力选择架构
  • 优先考虑功能而非监控、可观测性和运维手册

常见问题

实时分析应该使用Lambda还是Kappa架构?
当需要批处理准确性和低延迟视图以及复杂聚合时选择Lambda。当只需要更简单的纯流处理且重放能力足够时选择Kappa。Kappa降低了运维复杂性,但需要健壮的流处理基础设施。
如何处理流式管道中的延迟到达数据?
使用事件时间处理和水印来定义延迟阈值。为可以重新处理的延迟数据实现侧输出。对于关键数据,维护定期运行的批处理校正作业以修复任何遗漏的记录。
数据湖存储应该使用什么文件格式?
对于使用压缩和谓词下推的列式分析工作负载使用Parquet。Delta Lake或Iceberg在Parquet之上添加了ACID事务、schema演进和时间旅行功能。根据对事务和元数据管理的需求进行选择。
转换时应该使用dbt还是Spark?
在数据仓库中使用dbt进行基于SQL的转换,并内置测试和文档功能。对于大规模数据处理、需要Python/Scala的复杂转换,或在加载到仓库之前处理数据湖时使用Spark。
如何在流式处理中实现精确一次处理?
将幂等接收器与事务处理结合使用。使用Kafka事务进行原子写入,使用检查点状态进行恢复,并设计幂等操作。对于数据库,使用具有唯一约束的upsert操作来防止重复。
数据管道有哪些必要的监控指标?
跟踪:每个阶段处理和失败的记录数、端到端延迟、数据新鲜度、管道成功率和资源利用率。在SLA违规、错误率飙升和数据质量失败时设置告警。监控趋势以识别容量问题,防止其导致服务中断。

开发者详情

文件结构

📄 SKILL.md