数据融合的并行计算

2023-09-22 15:52:33

1、 数据融合的算法

数据融合的算法当中,需要对每一个格点i进行逐个计算,公式如下
在这里插入图片描述

2、出现的问题

但是随着背景场的空间分辨率的提高,格点数急剧增加。如空间分辨率为0.01°的话,那么15°✖15°的空间范围内就有1500✖1500个格点。那么在进行逐个格点计算的过程中,就非常耗时间。

3、编程

我先按照逐点计算并赋值给DataArray的点的思路进行编程。代码如下:

##对dataarray的每个点进行赋值
def change(da):
    for i in range(len(da.i.values)):
        for j in range(len(da.j.values)):
            da.loc[i,j] = i+j+3
            time.sleep(0.01)
import numpy as np
import xarray as xr
import time
if __name__ == '__main__':
    #构造一个dataarray
    n_wei = 1500
    data = np.random.rand(n_wei, n_wei)
    i = list(range(n_wei))
    j = list(range(n_wei))
    da = xr.DataArray(data, coords=[i, j], dims=["i", "j"])
    print(da)
    ###直接计算
    st0 = time.time()
    change(da)
    print('不并行用时:',time.time()-st0,'秒')

总共用时606秒。

4、Threading

然后为了能够节省时间,想要把逐点计算的第一层循环,即i循环分为好几个并行任务使用并行计算。我在这里使用了threading库,代码如下:

import math
def chunks(arr, m):   #把list arr分成m份,每份分给一个并行进程
    n = int(math.ceil(len(arr) / float(m)))
    return [arr[i:i + n] for i in range(0, len(arr), n)]

#对每个[ilist,j]纬度的dataarray进行change。
def start(da,ilist):
    for i in ilist:
        change(da,i)

##对dataarray的每个点进行赋值
def change(da,i):
    for j in range(len(da.j.values)):
        da.loc[i,j] = i+j+3

import numpy as np
import xarray as xr
import threading
import time
import os
#可能应该用multiprocessing而不是threading
if __name__ == '__main__':
    n_wei = 1500
    data = np.random.rand(n_wei, n_wei)
    i = list(range(n_wei))
    j = list(range(n_wei))
    da = xr.DataArray(data, coords=[i, j], dims=["i", "j"])
    print(da)
    ########开始并行
    st = time.time()
    #计算核数
    max_cpu = os.cpu_count()
    #设置并行数量
    para_n = max_cpu
    #把 i 行分为para_n个
    para_list = chunks(list(range(n_wei)),para_n)
    #设置并行任务
    threads = []
    for n in range(para_n):
        tmp = threading.Thread(target =start,args = (da,para_list[n]) )      ###用threading
        threads.append(tmp)
        tmp.start()
    for t in threads:
        t.join()
    #查看结果
    print(da)
    print('thread总线程数:',para_n,'用时:',time.time()-st,'秒')

总共用时648秒。
为啥用了threading以后反而更慢了呢?我觉得原因可能像这个博文所示。

5、改用array进行赋值

然后,有朋友建议用array进行循环赋值,会比用dataarray进行循环赋值要快。
那么,就试试,直接循环代码如下:

##对dataarray的每个点进行赋值
def change(da):
    for i in range(da.shape[0]):
        for j in range(da.shape[1]):
            da[i,j] = i+j+3

import numpy as np
import xarray as xr
import threading
import time
import multiprocessing
from multiprocessing import Pool
if __name__ == '__main__':
    #构造一个dataarray
    n_wei = 1500
    data = np.random.rand(n_wei, n_wei)
    ###直接计算
    st0 = time.time()
    change(data)
    print('不并行用时:',time.time()-st0,'秒')

用时0.48秒!

用了threading的代码如下:

import math
def chunks(arr, m):   #把list arr分成m份,每份分给一个并行进程
    n = int(math.ceil(len(arr) / float(m)))
    return [arr[i:i + n] for i in range(0, len(arr), n)]

#对每个[ilist,j]纬度的dataarray进行change。
def start(da,ilist,n_wei):
    for i in ilist:
        change(da,i,n_wei)

##对dataarray的每个点进行赋值
def change(da,i,n_wei=1000):
    for j in range(n_wei):
        da[i,j] = i+j+3

import numpy as np
import xarray as xr
import threading
import time
import os
#可能应该用multiprocessing而不是threading,但是multiprocessing不能在jupyter notebook里面用
if __name__ == '__main__':
    n_wei = 1500
    data = np.random.rand(n_wei, n_wei)
    i = list(range(n_wei))
    j = list(range(n_wei))
    print(data)
    ###如果不并行
    st0 = time.time()
    start(data,i,n_wei)
    print('不并行用时:',time.time()-st0,'秒')
    ########开始并行
    st = time.time()
    #计算核数
    max_cpu = os.cpu_count()
    #设置并行数量
    para_n = max_cpu
    #把 i 行分为para_n个
    para_list = chunks(list(range(n_wei)),para_n)
    #设置并行任务
    threads = []
    for n in range(para_n):
        tmp = threading.Thread(target =start,args = (data,para_list[n],n_wei) )      ###用threading
        threads.append(tmp)
        tmp.start()
    for t in threads:
        t.join()
    #查看结果
    print(data)
    print('总线程数:',para_n,'用时:',time.time()-st,'秒')

总共用时0.44秒!
啊,那是不是不用并行了,直接改用array循环赋值就行了?
然后,我把这个思路用在正式的数据融合计算中,结果发现,确实有节省时间,但是还是花费了很久,因为真正的数据融合计算中,每个格点的计算并不是像这个例子里面那么简单。
所以,我还是需要考虑并行计算的问题。接下来,参考和鲸社区的这个帖子进行并行计算的尝试。

6、multiprocessing和pool

首先有一点,jupyter notebook不支持multiprocessing,在jupyter notebook中使用multiprocessing只会卡住。
所以,只能用.py后缀名的文件使用Multiprocessing。
并且,为了测试Multiprocessing的并行计算是否节省了计算时间,在计算的时候,加上time.sleep(0.01)。
代码如下:

#!/usr/bin/env python
# coding: utf-8

# In[1]:

import math
def chunks(arr, m):   #把list arr分成m份,每份分给一个并行进程
    n = int(math.ceil(len(arr) / float(m)))
    return [arr[i:i + n] for i in range(0, len(arr), n)]


# In[2]:


#对每个[ilist,j]纬度的dataarray进行change。
def start(da,ilist):
    for i in ilist:
        change(da,i)


# In[3]:


##对dataarray的每个点进行赋值
def change(da,i):
    for j in range(da.shape[1]):
        da[i,j] = i+j+3
    time.sleep(0.1)



# In[ ]:


import numpy as np
import xarray as xr
import threading
import time
import multiprocessing
import os
from multiprocessing import Pool
#

#可能应该用multiprocessing而不是threading,但是multiprocessing不能在jupyter notebook里面用
if __name__ == '__main__':
    n_wei = 1500
    data = np.random.rand(n_wei, n_wei)
    i = list(range(n_wei))
    j = list(range(n_wei))
    # print(data)
    ###如果不并行
    st0 = time.time()
    start(data,i)
    print('不并行用时:',time.time()-st0,'秒')
    ########开始并行
    data = np.random.rand(n_wei, n_wei)
    st = time.time()
    ###
    max_cpu = os.cpu_count()
    #设置并行数量
    para_n = max_cpu
    #把 i 行分为para_n个
    para_list = chunks(list(range(n_wei)),para_n)
    #设置并行任务
    threads = []
    for n in range(para_n):
        tmp = multiprocessing.Process(target =start,args = (data,para_list[n]) )     ###用multiprocessing
        threads.append(tmp)
        tmp.start()
    for t in threads:
    #     print(threading.current_thread().name)
    #     t.setDaemon(True)
    #     t.start()
        t.join()
    #查看结果
    # print(data)
    print('总线程数:',para_n,'用时:',time.time()-st,'秒')
    # #####用pool试试
    data = np.random.rand(n_wei, n_wei)
    st1 = time.time()
    with Pool(max_cpu) as p:
        p.apply_async(change, args = (data,range(n_wei)) )
    p.close()
    p.join()
    # print(p,data)
    print('用pool:',time.time()-st1,'秒')

得到的结果如下:
在这里插入图片描述

更多推荐

浅谈建筑能耗智能监测平台发展现状及未来趋势

安科瑞华楠摘要:文章以每年发布的上海市国家机关办公建筑和大型公共建筑能耗监测及分析报告变化为切入点,分析了历年能耗分析报告的内容和功能变化;介绍了上海市国家机关办公建筑和大型公共建筑能耗监测平台发展和应用历程;揭示了当下显现的问题,并以问题为导向,预测了未来发展的趋势。关键词:国家机关办公建筑和大型公共建筑;能耗监测;

洁净室/净化车间:洁净等级划分及标准、检测方法及流程解读

无尘车间的发展与现代工业、尖端技术紧密的联系在一起。目前在生物制药、医疗卫生、食品日化、电子光学、能源、精密器械等行业运用已经相当的普遍且成熟。空气洁净度等级(aircleanlinessclass):洁净空间单位体积空气中,以大于或等于被考虑粒径的粒子最大浓度限值进行划分的等级标准。国内按空态、静态、动态对无尘车间进

MySQL详细案例 1:MySQL主从复制与读写分离

文章目录1.MySQL主从复制1.1使用场景1.2MySQL的复制类型1.3主从复制的作用1.4主从复制的工作过程1.5实现MySQL主从复制1.5.1前置准备1.5.2主服务器mysql配置1.5.3从服务器1mysql配置1.5.4从服务器2mysql配置1.5.5测试1.6主从复制的3种同步模式1.6.1异步复制

实时数仓混沌演练实践

一、背景介绍目前实时数仓提供的投放实时指标优先级别越来越重要,不再是单独的报表展示等功能,特别是提供给下游规则引擎的相关数据,直接对投放运营的广告投放产生直接影响,数据延迟或者异常均可能产生直接或者间接的资产损失。从投放管理平台的链路全景图来看,实时数仓是不可或缺的一环,可以快速处理海量数据,并迅速分析出有效信息,同时

Java JVM分析利器JProfiler 结合IDEA使用详细教程

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档文章目录前言一、JProfiler是什么?二、我的环境三、安装步骤1.Idea安装JProfiler插件1.下载程序的安装包四、启动前言对于我们Java程序员而言,肯定需要对项目工程进行JVM监控分析,最终选择jprofiler,它可以远程链接,使用方便,

opencv图像像素类型转换与归一化

文章目录opencv图像像素类型转换与归一化1、为什么对图像像素类型转换与归一化2、在OpenCV中,`convertTo()`和`normalize()`是两个常用的图像处理函数,用于图像像素类型转换和归一化;(1)`convertTo()`函数用于将一个`cv::Mat`对象的像素类型转换为另一种类型。它的基本用法

第8章 MySQL的数据目录

8.1数据库和文件系统的关系像InnoDB、MyISAM这样的存储引擎都是把表存储在磁盘上的,而操作系统用来管理磁盘的又被称为文件系统,所以用专业一点的话来表述就是:像InnoDB、MyISAM这样的存储引擎都是把表存储在文件系统上的。当我们想读取数据的时候,这些存储引擎会从文件系统中把数据读出来返回给我们,当我们想写

公私钥非对称加密 生成和验证JSON Web Token (JWT)

前言这是我在这个网站整理的笔记,关注我,接下来还会持续更新。作者:神的孩子都在歌唱公私钥非对称加密生成和验证JSONWebToken什么是JSONWebToken(JWT)Java程序中生成和验证JWT代码解析什么是JSONWebToken(JWT)JSONWebToken(JWT)是一种轻量级的身份验证和授权机制,由

【Linux 之二】Ubuntu下开发环境的搭建(NFS \ SSH \ FTP \ Smba \ ...)

目前正在进行Linux相关项目的开发,而我的Linux开发是在Ubuntu(版本20.04)下进行的,为此需要搭建很多Linux相关的开发环境,方便工作的进行。这里主要是对各种开发环境的搭建做一个总结记录,方便后面查阅,也方便在Linux开发之路上遇到困难的各位同仁。好了,废话不多说,直接罗列各种开发环境的安装步骤等。

Java 多线程

目录线程相关概念线程基本使用1.继承Thread类,重写run方法示例代码程序示意图2.实现Runnable接口,重写run方法示例代码*应用案例代码理解3.继承Threadvs实现Runnable的区别4.多线程售票问题引出同步互斥问题5.线程终止代码示意图线程常用方法第一组示例代码第二组示例代码用户线程和守护线程示

Google Guava精讲(一)-Guava是什么?能做什么?

https://mvnrepository.com/artifact/com.google.guava/guava作为Java栈的测试工程师,在写代码时候会频繁遇到字符串处理、缓存、反射等问题,我们最常规的做法就是,为了使原生的JDK方法好用,通常是做了一层又一层封装,然后提供整个测试团队使用,而渐渐的就形成了自己的J

热文推荐