助力工业物联网,工业大数据之服务域:Shell调度测试【三十三】

2023-08-17 00:00:00

知识点07:Shell调度测试

  • 目标实现Shell命令的调度测试

  • 实施

    • 需求:使用BashOperator调度执行一条Linux命令

    • 代码

      • 创建

        # 默认的Airflow自动检测工作流程序的文件的目录
        mkdir -p /root/airflow/dags
        cd /root/airflow/dags
        vim first_bash_operator.py
        
      • 开发

      # import
      from airflow import DAG
      from airflow.operators.bash import BashOperator
      from airflow.utils.dates import days_ago
      from datetime import timedelta
      
      # define args
      default_args = {
          'owner': 'airflow',
          'email': ['airflow@example.com'],
          'email_on_failure': True,
          'email_on_retry': True,
          'retries': 1,
          'retry_delay': timedelta(minutes=1),
      }
      
      # define dag
      dag = DAG(
          'first_airflow_dag',
          default_args=default_args,
          description='first airflow task DAG',
          schedule_interval=timedelta(days=1),
          start_date=days_ago(1),
          tags=['itcast_bash'],
      )
      
      # define task1
      run_bash_task = BashOperator(
          task_id='first_bashoperator_task',
          bash_command='echo "hello airflow"',
          dag=dag,
      )
      
      # run the task
      run_bash_task
      
      • 工作中使用bashOperator

        bash_command='sh xxxx.sh'
        
      • xxxx.sh:根据需求

        • Linux命令
        • hive -f
        • spark-sql -f
        • spark-submit python | jar
    • 提交

      python first_bash_operator.py 
      
    • 查看

      image-20211005125514015

    • 执行

      image-20211005125649864

  • 小结

    • 实现Shell命令的调度测试

知识点08:依赖调度测试

  • 目标:实现AirFlow的依赖调度测试

  • 实施

    • 需求:使用BashOperator调度执行多个Task,并构建依赖关系

    • 代码

      • 创建

        cd /root/airflow/dags
        vim second_bash_operator.py
        
      • 开发

        # import
        from datetime import timedelta
        from airflow import DAG
        from airflow.operators.bash import BashOperator
        from airflow.utils.dates import days_ago
        
        # define args
        default_args = {
            'owner': 'airflow',
            'email': ['airflow@example.com'],
            'email_on_failure': True,
            'email_on_retry': True,
            'retries': 1,
            'retry_delay': timedelta(minutes=1),
        }
        
        # define dag
        dag = DAG(
            'second_airflow_dag',
            default_args=default_args,
            description='first airflow task DAG',
            schedule_interval=timedelta(days=1),
            start_date=days_ago(1),
            tags=['itcast_bash'],
        )
        
        # define task1
        say_hello_task = BashOperator(
            task_id='say_hello_task',
            bash_command='echo "start task"',
            dag=dag,
        )
        
        # define task2
        print_date_format_task2 = BashOperator(
            task_id='print_date_format_task2',
            bash_command='date +"%F %T"',
            dag=dag,
        )
        
        # define task3
        print_date_format_task3 = BashOperator(
            task_id='print_date_format_task3',
            bash_command='date +"%F %T"',
            dag=dag,
        )
        
        # define task4
        end_task4 = BashOperator(
            task_id='end_task',
            bash_command='echo "end task"',
            dag=dag,
        )
        
        say_hello_task >> [print_date_format_task2,print_date_format_task3] >> end_task4
        
    • 提交

      python second_bash_operator.py 
      
    • 查看

      image-20211005131800085

  • 小结

    • 实现AirFlow的依赖调度测试

知识点09:Python调度测试

  • 目标实现Python代码的调度测试

  • 实施

    • 需求:调度Python代码Task的运行

    • 代码

      • 创建

        cd /root/airflow/dags
        vim python_etl_airflow.py
        
      • 开发

        # import package
        from airflow import DAG
        from airflow.operators.python import PythonOperator
        from airflow.utils.dates import days_ago
        import json
        
        # define args
        default_args = {
            'owner': 'airflow',
        }
        
        # define the dag
        with DAG(
            'python_etl_dag',
            default_args=default_args,
            description='DATA ETL DAG',
            schedule_interval=None,
            start_date=days_ago(2),
            tags=['itcast'],
        ) as dag:
            # function1
            def extract(**kwargs):
                ti = kwargs['ti']
                data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22, "1004": 606.65, "1005": 777.03}'
                ti.xcom_push('order_data', data_string)
                
            # function2
            def transform(**kwargs):
                ti = kwargs['ti']
                extract_data_string = ti.xcom_pull(task_ids='extract', key='order_data')
                order_data = json.loads(extract_data_string)
                total_order_value = 0
                for value in order_data.values():
                    total_order_value += value
                total_value = {"total_order_value": total_order_value}
                total_value_json_string = json.dumps(total_value)
                ti.xcom_push('total_order_value', total_value_json_string)
                
            # function3
            def load(**kwargs):
                ti = kwargs['ti']
                total_value_string = ti.xcom_pull(task_ids='transform', key='total_order_value')
                total_order_value = json.loads(total_value_string)
                print(total_order_value)
                
            # task1
            extract_task = PythonOperator(
                task_id='extract',
                python_callable=extract,
            )
            extract_task.doc_md = """\
        #### Extract task
        A simple Extract task to get data ready for the rest of the data pipeline.
        In this case, getting data is simulated by reading from a hardcoded JSON string.
        This data is then put into xcom, so that it can be processed by the next task.
        """
        	# task2
            transform_task = PythonOperator(
                task_id='transform',
                python_callable=transform,
            )
            transform_task.doc_md = """\
        #### Transform task
        A simple Transform task which takes in the collection of order data from xcom
        and computes the total order value.
        This computed value is then put into xcom, so that it can be processed by the next task.
        """
        	# task3
            load_task = PythonOperator(
                task_id='load',
                python_callable=load,
            )
            load_task.doc_md = """\
        #### Load task
        A simple Load task which takes in the result of the Transform task, by reading it
        from xcom and instead of saving it to end user review, just prints it out.
        """
        
        # run
        extract_task >> transform_task >> load_task
        
    • 提交

      python python_etl_airflow.py
      
    • 查看

      image-20211005150051298

  • 小结

    • 实现Python代码的调度测试

知识点10:Oracle与MySQL调度方法

  • 目标:了解Oracle与MySQL的调度方法

  • 实施

    • Oracle调度:参考《oracle任务调度详细操作文档.md》

      • step1:本地安装Oracle客户端

      • step2:安装AirFlow集成Oracle库

      • step3:创建Oracle连接

      • step4:开发测试

        query_oracle_task = OracleOperator(
            task_id = 'oracle_operator_task',
            sql = 'select * from ciss4.ciss_base_areas',
            oracle_conn_id = 'oracle-airflow-connection',
            autocommit = True,
            dag=dag
        )
        
    • MySQL调度:《MySQL任务调度详细操作文档.md》

      • step1:本地安装MySQL客户端

      • step2:安装AirFlow集成MySQL库

      • step3:创建MySQL连接

      • step4:开发测试

        • 方式一:指定SQL语句

          query_table_mysql_task = MySqlOperator(
              task_id='query_table_mysql', 
              mysql_conn_id='mysql_airflow_connection', 
              sql=r"""select * from test.test_airflow_mysql_task;""",
              dag=dag
          )
          
          • 方式二:指定SQL文件

            query_table_mysql_task = MySqlOperator(
                task_id='query_table_mysql_second', 
                mysql_conn_id='mysql-airflow-connection', 
                sql='test_airflow_mysql_task.sql',
                dag=dag
            )
            
        • 方式三:指定变量

          insert_sql = r"""
          INSERT INTO `test`.`test_airflow_mysql_task`(`task_name`) VALUES ( 'test airflow mysql task3');
          INSERT INTO `test`.`test_airflow_mysql_task`(`task_name`) VALUES ( 'test airflow mysql task4');
          INSERT INTO `test`.`test_airflow_mysql_task`(`task_name`) VALUES ( 'test airflow mysql task5');
          """
          
          insert_table_mysql_task = MySqlOperator(
              task_id='mysql_operator_insert_task', 
              mysql_conn_id='mysql-airflow-connection', 
              sql=insert_sql,
              dag=dag
          )
          

  • 小结

    • 了解Oracle与MySQL的调度方法

知识点11:大数据组件调度方法

  • 目标:了解大数据组件调度方法

  • 实施

    • AirFlow支持的类型

      • HiveOperator
      • PrestoOperator
      • SparkSqlOperator
    • 需求:Sqoop、MR、Hive、Spark、Flink

    • 解决:统一使用BashOperator或者PythonOperator,将对应程序封装在脚本中

      • Sqoop

        run_sqoop_task = BashOperator(
            task_id='sqoop_task',
            bash_command='sqoop --options-file xxxx.sqoop',
            dag=dag,
        )
        
      • Hive

        run_hive_task = BashOperator(
            task_id='hive_task',
            bash_command='hive -f xxxx.sql',
            dag=dag,
        )
        
      • Spark

        run_spark_task = BashOperator(
            task_id='spark_task',
            bash_command='spark-sql -f xxxx.sql',
            dag=dag,
        )
        
      • Flink

        run_flink_task = BashOperator(
            task_id='flink_task',
            bash_command='flink run /opt/flink-1.12.2/examples/batch/WordCount.jar',
            dag=dag,
        )
        
  • 小结

    • 了解大数据组件调度方法
更多推荐

数据结构——散列函数、散列表

文章目录前言一、散列表的基本概念二、散列函数的构造方法三、处理冲突的方法1.开放定址法:2.拉链法四、散列查找及性能分析总结前言散列表的基本概念散列函数的构造方法处理冲突的方法散列查找及性能分析提示:以下是本篇文章正文内容,下面案例可供参考一、散列表的基本概念概念:之前的算法建立在“比较”基础上,效率取决于比较次数散列

武汉凯迪正大—继电保护测试仪

一、凯迪正大微机继电保护测试仪产品概述KDJB系列微机继电保护校验仪是在参照电力部颁发的《微机型继电保护试验装置技术条件(讨论稿)》的基础上,听取用户意见,总结目前国内同类产品优缺点,充分使用现代的微电子技术和器件实现的一种小型化微机继电保护测试仪。它采用单机独立运行,亦可联接笔记本电脑运行。主机内置新一代高速数字信号

【zookeeper】基于Linux环境安装zookeeper集群

前提,需要有几台linux机器,我们可以准备好诸如finalshell来连接linux并且上传文件;其次Linux需要安装上ssh,并且在/etc/hosts文件中写好其他几台机器的名字和Ip127.0.0.1localhostlocalhost.localdomainlocalhost4localhost4.loca

ChatGLM 大模型外挂(向量)知识库

前言如果我们想往大模型里边注入知识,最先能想到的就是对大模型进行微调。笔者曾实验过,只用几十万量级的数据对大模型进行微调并不能很好的将额外知识注入大模型,笔者在算力这方面囊中羞涩,只有4块卡,这几十万量级的数据训练6B的模型都要训练好几天。。。如果不微调的话,其实还是可以利用外挂数据库的方式让大模型利用额外的知识的,比

Python案例|使用卷积网络对星系图片进行分类

星系动物园(galaxyzoo)是由牛津大学等研究机构组织并邀请公众协助的志愿者科学计划,目的是为超过100万个星系图像进行分类。这是天文学中一次规模浩大的公众星空普查活动,大众参与热情高涨,在近十万名志愿者的积极参与下,只用了175天就完成了第一阶段的星系动物园项目:对95万个星系进行了分类,而且平均每个星系被分类了

Haproxy集群调度器与部署

一、Haproxy介绍:1.Haproxy应用分析:LVS在企业中康复在能力很强,但存在不足:LVS不支持正则处理,不能实现动静分离对于大型网站LVS的事实配置较为复杂,维护成本相对较Haproxy是一款可以供高可用性、负载均衡和基于TCP和HTTP应用的代理软件非常适用于并发大(并发达1w以上)web站点,可保持站点

高阶数据结构(2)-----红黑树(未完成)

一)红黑树的基本概念和基本性质:1)红黑树就是一种高度平衡的二叉搜索树,但是在每一个节点上面都增加了一个存储位来表示结点的颜色,可以是红色或者是黑色,通过对任何一条从根节点到叶子节点上面的路径各个节点着色方式的限制,红黑树会自动确保没有一条路经会比其他路径的长度高出两倍,而是接近平衡的2)红黑树最长路径是最短路径的两倍

vue3 effect.spec

🎬岸边的风:个人主页🔥个人专栏:《VUE》《javaScript》⛺️生活的理想,就是为了理想的生活!目录原型观察的对象的变更会同步到原始对象重复观察相同的原始对象直接返回相同的proxy对象不会污染原始对象通过toRawapi可以返回被观察对象的原始对象shallowReactive结语定义一个对象origina

JavaScript的三大组成部分是什么?JavaScript的核心组成部分解析:语法、BOM和DOM

🌷🍁博主猫头虎带您GotoNewWorld.✨🍁🦄博客首页——猫头虎的博客🎐🐳《面试题大全专栏》文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺🌊《IDEA开发秘籍专栏》学会IDEA常用操作,工作效率翻倍~💐🌊《100天精通Golang(基础入门篇)》学会Golang语言,畅玩云原生,走遍大

Nex.js Web 应用程序 SSG 与 SSR——选择正确的渲染方法

Next.js,一个流行的React框架,改变了开发人员构建现代Web应用程序的方式。它提供了强大的功能,例如服务器端渲染(SSR)和静态站点生成(SSG),可优化应用程序的性能和用户体验。在这篇博文中,我们将探讨SSG和SSR之间的主要区别、它们的优势、何时选择其中一种方法,以及如何使用AWSAmplify部署这两种

基于ASCON的AEAD

1.引言前序博客:ASCON:以“慢而稳”赢得NIST轻量级加密算法标准密码学中的AEAD(authenticatedencryptionwithassociateddata)对称密钥加密过去数年来已发生改变,具体为:当今主要使用streamciphers,因其比blockciphers要快得多。经常会使用AEAD(A

热文推荐