数据工程中的单元测试完全指南

2023-09-22 14:17:42

在数据工程领域中,经常被忽视的一项实践是单元测试。许多人可能认为单元测试仅仅是一种软件开发方法论,但事实远非如此。随着我们努力构建稳健、无错误的数据流水线和SQL数据模型,单元测试在数据工程中的价值变得越来越清晰。

本文带你深入探索如何将这些成熟的软件工程实践应用到数据工程中。

1 单元测试的重要性

在数据工程的背景下,采用单元测试可以确保您的数据和业务逻辑的准确性,进而产出高质量的数据,获得您的数据分析师、科学家和决策者对数据的信任。

2 单元测试数据流水线

数据流水线通常涉及复杂的数据抽取、转换和加载(ETL)操作序列,出错的可能性很大。为了对这些操作进行单元测试,我们将流水线拆分为单个组件,并对每个组件进行独立验证。

以一个简单的流水线为例,该流水线从CSV文件中提取数据,通过清除空值来转换数据,然后将其加载到数据库中。以下是使用pandas的基于Python的示例:

import pandas as pd
  from sqlalchemy import create_engine
  # 加载CSV文件的函数
  def load_data(file_name):
      data = pd.read_csv(file_name)
      return data
  # 清理数据的函数
  def clean_data(data):
      data = data.dropna()
      return data
  # 将数据保存到SQL数据库的函数
  def save_data(data, db_string, table_name):
      engine = create_engine(db_string)
      data.to_sql(table_name, engine, if_exists='replace')
  # 运行数据流水线
  data = load_data('data.csv')
  data = clean_data(data)
  save_data(data, 'sqlite:///database.db', 'my_table')

为了对这个流水线进行单元测试,我们使用像pytest这样的库为每个函数编写单独的测试。

在这个示例中,有三个主要的函数:load_data、clean_data和save_data。我们会为每个函数编写测试。对于load_data和save_data,需要设置一个临时的CSV文件和SQLite数据库,可以使用pytest库的fixture功能来实现。

import os
  import pandas as pd
  import pytest
  from sqlalchemy import create_engine, inspect
  # 使用pytest fixture来设置临时的CSV文件和SQLite数据库
  @pytest.fixture
  def csv_file(tmp_path):
      data = pd.DataFrame({
          'name': ['John', 'Jane', 'Doe'],
          'age': [34, None, 56]  # Jane的年龄缺失
      })
      file_path = tmp_path / "data.csv"
      data.to_csv(file_path, index=False)
      return file_path
  @pytest.fixture
  def sqlite_db(tmp_path):
      file_path = tmp_path / "database.db"
      return 'sqlite:///' + str(file_path)
  def test_load_data(csv_file):
      data = load_data(csv_file)
      
      assert 'name' in data.columns
      assert 'age' in data.columns
      assert len(data) == 3
  def test_clean_data(csv_file):
      data = load_data(csv_file)
      data = clean_data(data)
      
      assert data['age'].isna().sum() == 0
      assert len(data) == 2  # Jane的记录应该被删除
  def test_save_data(csv_file, sqlite_db):
      data = load_data(csv_file)
      data = clean_data(data)
      save_data(data, sqlite_db, 'my_table')
      
      # 检查数据是否保存正确
      engine = create_engine(sqlite_db)
      inspector = inspect(engine)
      tables = inspector.get_table_names()
      
      assert 'my_table' in tables
      
      loaded_data = pd.read_sql('my_table', engine)
      assert len(loaded_data) == 2  # 只应该存在John和Doe的记录

这里是另一个例子:假设您有一个从CSV文件中加载数据并将其中的“日期”列从字符串转换为日期时间的流水线:

def convert_date(data, date_column):
      data[date_column] = pd.to_datetime(data[date_column])
      return data

为上述函数编写的单元测试将传入具有已知日期字符串格式的DataFrame。然后,它将验证函数是否正确将日期转换为日期时间对象,并且它是否适当处理无效格式。

我们为上述场景编写一个单元测试。该测试首先使用有效日期检查函数,断言输出DataFrame中的“date”列确实是datetime类型,并且值与预期相符。然后,它检查在给出无效日期时,函数是否正确引发了ValueError。

import pandas as pd
  import pytest
  def test_convert_date():
      # 使用有效日期进行测试
      test_data = pd.DataFrame({
          'date': ['2021-01-01', '2021-01-02']
      })
      
      converted_data = convert_date(test_data.copy(), 'date')
      
      assert pd.api.types.is_datetime64_any_dtype(converted_data['date'])
      assert converted_data.loc[0, 'date'] == pd.Timestamp('2021-01-01')
      assert converted_data.loc[1, 'date'] == pd.Timestamp('2021-01-02')
      # 使用无效日期进行测试
      test_data = pd.DataFrame({
          'date': ['2021-13-01']  # 这个日期是无效的,因为没有第13个月
      })
      
      with pytest.raises(ValueError):
          convert_date(test_data, 'date')

以下是最后一个例子:假设您有一个加载数据并进行聚合的流水线,计算每个地区的总销售额:

def aggregate_sales(data):
      aggregated = data.groupby('region').sales.sum().reset_index()
      return aggregated

为该函数编写的单元测试将向其传递具有各个地区销售数据的DataFrame。测试将验证函数是否正确计算每个地区的总销售额。

我们为该函数编写一个单元测试。在这个测试中,我们首先向aggregate_sales函数传递一个具有已知销售数据的DataFrame,并检查它是否正确聚合了销售额。然后,向其传递一个没有销售数据的DataFrame,并检查它是否正确将这些销售额聚合为0。这样可以确保函数正确处理典型情况和边缘情况。

以下是使用pytest库为aggregate_sales函数编写单元测试的示例:

import pandas as pd
  import pytest
  def test_aggregate_sales():
      # 各个地区的销售数据
      test_data = pd.DataFrame({
          'region': ['North', 'North', 'South', 'South', 'East', 'East', 'West', 'West'],
          'sales': [100, 200, 300, 400, 500, 600, 700, 800]
      })
      
      aggregated = aggregate_sales(test_data)
      
      assert aggregated.loc[aggregated['region'] == 'North', 'sales'].values[0] == 300
      assert aggregated.loc[aggregated['region'] == 'South', 'sales'].values[0] == 700
      assert aggregated.loc[aggregated['region'] == 'East', 'sales'].values[0] == 1100
      assert aggregated.loc[aggregated['region'] == 'West', 'sales'].values[0] == 1500
      # 没有销售数据的测试
      test_data = pd.DataFrame({
          'region': ['North', 'South', 'East', 'West'],
          'sales': [0, 0, 0, 0]
      })
      
      aggregated = aggregate_sales(test_data)
      
      assert aggregated.loc[aggregated['region'] == 'North', 'sales'].values[0] == 0
      assert aggregated.loc[aggregated['region'] == 'South', 'sales'].values[0] == 0
      assert aggregated.loc[aggregated['region'] == 'East', 'sales'].values[0] == 0
      assert aggregated.loc[aggregated['region'] == 'West', 'sales'].values[0] == 0

感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取 

更多推荐

Time-distributed 的理解

“Time-distributed”是一种用于深度学习处理序列数据的技术,它将神经网络中的层或网络独立地应用于序列的每个时间步长。在典型的前馈神经网络中,输入数据会被馈送到网络中,并且相同的权重会被应用于所有的输入特征。但是,当处理序列数据,如时间序列或自然语言时,我们需要在每个时间步长上应用相同的权重来捕捉时间信息。

网页采集器-免费的网页采集器

在互联网上,蕴藏着无穷无尽的信息宝藏,无论您是一名学生、研究人员、市场分析师还是企业家,都需要从网络上搜集各种信息来支持您的工作和决策。然而,互联网上的信息千差万别,分散在不同的网站和页面上,如何高效地采集和整理这些信息?免费全自动采集发布批量管理网站工具-147SEO​www.147seo.com/post/1196

文档信息抽取技术:从非结构化文本到结构化信息的旅程

文档信息抽取技术是一种将非结构化文本转化为结构化信息的技术。这种技术可以从各类文档中自动提取出如实体、关系和其他重要信息,并将它们转化为方便计算机进一步处理和分析的格式。技术点包括:1.文本预处理:对文档进行清洗和预处理,这包括统一字符编码、消除冗余和重复内容、去除特殊字符和HTML标签、处理拼写错误、进行分词、识别和

Linux工具(一)

前言:Linux是一个开源的操作系统,它拥有庞大而活跃的开发社区,为用户提供了丰富多样的工具和应用程序。这些工具不仅适用于系统管理员和开发人员,也适用于普通用户,可以帮助他们完成各种任务,从简单的文件管理到复杂的系统配置。从本文开始,我们将系列学习五个Linux的入门开发工具,本期我们先来介绍两个工具:yum和vim工

ETHERNET IP站转MODBUS RTU协议网

产品介绍JM-EIP-RTU是自主研发的一款ETHERNET/IP从站功能的通讯网关。该产品主要功能是将各种MODBUS-RTU设备接入到ETHERNET/IP网络中。JM-EIP-RTU连接到ETHERNET/IP总线中做为从站使用,连接到MODBUS-RTU总线中做为主站或从站使用。产品参数技术参数l网关做为ETH

MySQL数据库描述以及安装使用

一:数据库介绍数据库数据库就是用来存储数据的一种特殊文件。数据库类别数据库主要分为两种:关系型数据库RDBMS非关系型数据库关系型数据库的主要产品:oracle:在以前的大型项目中使用,银行,电信等项目mysql:web时代使用最广泛的关系型数据库mssqlserver:在微软的项目中使用sqlite:轻量级数据库,主

Linux线程

1.进程是资源管理的最小单位,线程是程序执行的最小单位。2.每个进程有自己的数据段、代码段和堆栈段。线程通常叫做轻型的进程,它包含独立的栈和CPU寄存器状态,线程是进程的一条执行路径,每个线程共享其所附属进程的所有资源,包括打开的文件、内存页面、信号标识及动态分配的内存等。3.因为线程和进程比起来很小,所以相对来说,线

Java抽象类和普通类区别、 数组跟List的区别

抽象类Java中的抽象类是一种特殊的类,它不能被实例化,只能被继承。抽象类通常用于定义一些通用的属性和方法,但是这些方法的具体实现需要在子类中完成。抽象类中可以包含抽象方法和非抽象方法。抽象方法是一种没有实现的方法,只有方法的声明,没有方法体。抽象方法必须在抽象类中声明,而且子类必须实现这些抽象方法。如果子类没有实现抽

Linux sed

1.sed介绍sed:StreamEditor,流编辑器、行编辑器、逐行编辑sed将每行内容读入到“内存”中,在内存中进行处理,将结果返回给屏幕,此段内存空间称为模式空间。sed默认不编辑原文件,仅对模式空间的数据进行处理,处理结束后,将模式空间的内容显示到屏幕2.sed语法sed命令的语法格式sed[option]s

2023研究生数学建模竞赛A题B题C题D题E题F题思路+模型+代码

目录1.A题B题C题D题E题F题思路模型:9.22早上比赛开始后,第一时间更新,获取见文末名片2.竞赛注意事项:包括比赛流程,任务分配,时间把控,论文润色,已经发布在文末名片中3.常用国赛数学建模算法3.1分类问题3.2优化问题4.获取赛题思路模型见此名片1.A题B题C题D题E题F题思路模型:9.22早上比赛开始后,第

【Java】泛型 之 使用泛型

使用ArrayList时,如果不定义泛型类型时,泛型类型实际上就是Object://编译器警告:Listlist=newArrayList();list.add("Hello");list.add("World");Stringfirst=(String)list.get(0);Stringsecond=(String

热文推荐