InfluxDB时序数据库安装和使用

2023-09-15 16:34:37

安装

下载

 wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.4.0-linux-amd64.tar.gz

安装(没有/opt/module/目录的话先创建)

tar -zxvf influxdb2-2.4.0-linux-amd64.tar.gz -C /opt/module/

运行

cd /opt/module/influxdb2_linux_amd64/
./influxd 

在这里插入图片描述
访问页面
http://192.168.233.128:8086/
看到下面页面表示influxdb启动成功~
在这里插入图片描述
初始化 设置用户密码 组织结构 存储桶
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Load Data( 数据来源 )

行协议

在这里插入图片描述

使用telegraf获取数据来源

1、新建存储桶example02
在这里插入图片描述
2、创建一个配置
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
3、下载安装telegraf

 wget https://dl.influxdata.com/telegraf/releases/telegraf-1.23.4_linux_amd64.tar.gz
 tar -zxvf telegraf-1.23.4_linux_amd64.tar.gz -C /opt/module/
 cd /opt/module/telegraf-1.23.4/usr/bin

复制API Token 到/opt/module/telegraf-1.23.4/usr/bin目录下粘贴,按回车
在这里插入图片描述
在这里插入图片描述
env命令就能看到这个环境变量
在这里插入图片描述
启动telegraf
在这里插入图片描述
在这里插入图片描述

然后在控制Data Explorer面板就能看到从telegraf发送到InfluxDB的数据
在这里插入图片描述

使用Scrapers获取数据来源

1、下载安装启动 普罗米修斯

wget https://github.com/prometheus/node_exporter/releases/download/v1.6.1/node_exporter-1.6.1.linux-amd64.tar.gz
tar -zxvf node_exporter-1.6.1.linux-amd64.tar.gz -C /opt/module/
cd /opt/module/node_exporter-1.6.1.linux-amd64
./node_exporter 

在这里插入图片描述
访问:http://192.168.233.128:9100/metrics
就能看到监控系统情况数据指标
在这里插入图片描述
2、创建新的存储桶example03
在这里插入图片描述
3、创建新的Scraper
在这里插入图片描述
在这里插入图片描述

Java操作InfluxDB

controller

package cn.tedu.springboot_quick.controller;


import cn.tedu.springboot_quick.InfluxDB.InfluxDbMapper;
import cn.tedu.springboot_quick.InfluxDB.MeasurementCpu;
import cn.tedu.springboot_quick.common.CommonResult;
import cn.tedu.springboot_quick.common.utils.ResultUtil;
import cn.tedu.springboot_quick.entity.Menus;
import cn.tedu.springboot_quick.service.MenusService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 *  前端控制器
 * </p>
 *
 * @author zhushagnlin
 * @since 2021-07-12
 */

@Api(tags = "InfluxDB")
@RestController
@RequestMapping("/influx")
public class InfluxDBController {
    @Autowired
    InfluxDbMapper influxDbMapper;

   @ApiOperation(value = "InfluxDB插入",tags = "InfluxDB")
   @GetMapping("/insertInfluxDB")
   public CommonResult insertInfluxDB(){
       // 创建点数据
       Point point1 = Point.measurement("cpu")
               .time(System.currentTimeMillis()+ 8 * 60 * 60 * 1000L, TimeUnit.MILLISECONDS)
               .tag("tag1","tag111")
               .tag("tag2","tag222")
               .addField("value", 80)
               .addField("host", "server1")
               .build();

       Point point2 = Point.measurement("cpu")
               .time(System.currentTimeMillis()+2000+ 8 * 60 * 60 * 1000L, TimeUnit.MILLISECONDS)
               .tag("tag1","tag111")
               .tag("tag2","tag222")
               .addField("value", 65)
               .addField("host", "server2")
               .build();


       // 批量写入点数据
       BatchPoints batchPoints = BatchPoints.database("test1")
               .points(point1,point2)
               .build();

       influxDbMapper.writeBatch(batchPoints);

       return ResultUtil.result();
   }

   @ApiOperation(value = "InfluxDB查询",tags = "InfluxDB")
   @GetMapping("/selectInfluxDB")
   public CommonResult selectInfluxDB(){
       //编写select查询语句
       String queryCmd = "SELECT * FROM cpu";
       List<MeasurementCpu> result = influxDbMapper.query(queryCmd, MeasurementCpu.class);
       return ResultUtil.list(result);
   }

}


InfluxDbMapper

package cn.tedu.springboot_quick.InfluxDB;

import cn.tedu.springboot_quick.config.InfluxConfig;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.BeanWrapperImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Component
public class InfluxDbMapper {

    private final InfluxConfig influxConfig;

    private InfluxDB influxDB;

    @Autowired
    public InfluxDbMapper(InfluxConfig influxConfig) {
        this.influxConfig = influxConfig;
        getInfluxDB();
    }

    /**
     * 获取 influxdb 连接
     */
    public void getInfluxDB() {
        if (influxDB == null) {
            influxDB = InfluxDBFactory.connect(influxConfig.getUrl(),
                    influxConfig.getUsername(),
                    influxConfig.getPassword());
            //设置使用数据库  保证库存在
            influxDB.setDatabase(influxConfig.getDatabase());
            //设置数据库保留策略 保证策略存在
            if (!ObjectUtils.isEmpty(influxConfig.getRetention())) {
                influxDB.setRetentionPolicy(influxConfig.getRetention());
            }
            //配置 写入策略
            if(!ObjectUtils.isEmpty(influxConfig.getEnableBatchTime()) &&
                    !ObjectUtils.isEmpty(influxConfig.getEnableBatchCount())){
                influxDB.enableGzip().enableBatch(influxConfig.getEnableBatchCount(),
                        influxConfig.getEnableBatchTime(),TimeUnit.MILLISECONDS);
            }

        }
    }

    /**
     * 关闭连接
     */
    public void close() {
        if (influxDB != null) {
            influxDB.close();
        }
    }

    /**
     * 指定时间插入
     *
     * @param measurement 表
     * @param tags        标签
     * @param fields      字段
     * @param time        时间
     * @param unit        单位
     */
    public void write(String measurement, Map<String, String> tags, Map<String, Object> fields, long time, TimeUnit unit) {
        Point point = Point.measurement(measurement).tag(tags).fields(fields).time(time, unit).build();
        influxDB.write(point);
        close();
    }

    /**
     * 插入数据-自动生成时间
     *
     * @param measurement 表
     * @param tags        标签
     * @param fields      字段
     */
    public void write(String measurement, Map<String, String> tags, Map<String, Object> fields) {
        write(measurement, tags, fields, System.currentTimeMillis() + 8 * 60 * 60 * 1000L, TimeUnit.MILLISECONDS);
    }

    /**
     * 批量插入
     *
     * @param points 批量记录  推荐 1000 条作为一个批
     */
    public void writeBatch(BatchPoints points) {
        influxDB.write(points);
        close();
    }

    public void writeBatch(List<String> list) {
        influxDB.write(list);
        close();
    }

    /**
     * 用来执行相关操作
     *
     * @param command 执行命令
     * @return 返回结果
     */
    public QueryResult query(String command) {
        return influxDB.query(new Query(command));
    }

    /**
     * 创建数据库
     *
     * @param name 库名
     */
    public void createDataBase(String name) {
        query("create database " + name);
    }

    /**
     * 删除数据库
     *
     * @param name 库名
     */
    public void dropDataBase(String name) {
        query("drop database " + name);
    }

    /**
     * select 查询封装
     *
     * @param queryResult 查询返回结果
     * @param clazz       封装对象类型
     * @param <T>         泛型
     * @return 返回处理回收结果
     */
    public <T> List<T> handleQueryResult(QueryResult queryResult, Class<T> clazz) {
        //0.定义保存结果集合
        List<T> lists = new ArrayList<>();
        //1.获取结果
        List<QueryResult.Result> results = queryResult.getResults();
        //2.遍历结果
        results.forEach(result -> {
            //3.获取 series
            List<QueryResult.Series> seriesList = result.getSeries();
            if (!CollectionUtils.isEmpty(seriesList)) {
                //4.遍历 series
                seriesList.forEach(series -> {
                    //5.获取的所有列
                    List<String> columns = series.getColumns();
                    //6.获取所有值
                    List<List<Object>> values = series.getValues();
                    //7.遍历数据 获取结果
                    for (int i = 0; i < values.size(); i++) {
                        try {
                            //8.根据 clazz 进行封装
                            T instance = clazz.newInstance();
                            //9.通过 spring 框架提供反射类进行处理
                            BeanWrapperImpl beanWrapper = new BeanWrapperImpl(instance);
                            HashMap<String, Object> fields = new HashMap<>();
                            for (int j = 0; j < columns.size(); j++) {
                                String column = columns.get(j);
                                Object val = values.get(i).get(j);
                                if ("time".equals(column)) {
                                    //beanWrapper.setPropertyValue("time", Timestamp.from(ZonedDateTime.parse(String.valueOf(val)).toInstant()).getTime());
                                    beanWrapper.setPropertyValue("time", String.valueOf(val));
                                } else {
                                    //保存当前列和值到 field map 中 //注意: 返回结果无须在知道是 tags 还是 fields  认为就是字段和值 可以将所有字段作为 field 进行返回
                                    fields.put(column, val);
                                }
                            }
                            //10.通过反射完成 fields 赋值操作
                            beanWrapper.setPropertyValue("fields", fields);
                            lists.add(instance);
                        } catch (InstantiationException | IllegalAccessException e) {
                            throw new RuntimeException(e);
                        }
                    }
                });
            }
        });
        return lists;
    }

    /**
     * 查询返回指定对象
     *
     * @param selectCommand select 语句
     * @param clazz         类型
     * @param <T>           泛型
     * @return 结果
     */
    public <T> List<T> query(String selectCommand, Class<T> clazz) {
        return handleQueryResult(query(selectCommand), clazz);
    }
}

MeasurementCpu

package cn.tedu.springboot_quick.InfluxDB;

import lombok.Data;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;

import java.util.HashMap;

@Data
@Measurement(name = "cpu")
public class MeasurementCpu {
    private HashMap<String, Object> fields;

    @Column(name = "time")
    private String time;
}

2.4版本插入和查询

DemoPOJO

package cn.tedu.springboot_quick.InfluxDB.V24;

import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;

import java.time.Instant;

/**
 * @author: tony
 * @date: 2022/9/23 21:53
 * @description: 这是一个POJO类,对应将POJO类写入InfluxDB
 */
@Measurement(name="temperature")
public class DemoPOJO {

    /** 注意类上的@Measurement注解,我们既可以使用Measurement注解来指定一个POJO类的测量名称
     * 但是使用@Measurement注解会将测量名称直接写死在代码里
     * 当测量名称会在运行时发生改变时,我们可以使用@Column(menasurement=true)注解
     * 这样会将POJO类中被注解的值作为测量名称。
     * **/
    //@Column(measurement = true)
    //String measurementName;

    /** 相当于InfluxDB行协议中的标签集,此处标签的名称将会是location **/
    @Column(tag = true)
    String location;

    /** 相当于InfluxDB行协议中的字段集,此处字段的名称将会是value **/
    @Column
    Double value;

    /** 相当于InfluxDB行协议中的时间戳 **/
    @Column(timestamp = true)
    Instant timestamp;

    /** 全参构造器,方便调用者创建对象 **/
    public DemoPOJO(String location, Double value, Instant timestamp) {
        this.location = location;
        this.value = value;
        this.timestamp = timestamp;
    }
}

Write1

package cn.tedu.springboot_quick.InfluxDB.V24;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;

import java.time.Instant;

/**
 * @author: tony
 * @date: 2022/9/23 3:23
 * @description: 这里是使用同步方式向InfluxDB写入数据的示例代码
 *
 */
public class Write1 {

    /** tony 操作时需要换成自己的 **/
    private static char[] token = "JGk2wps3dHXbxKnME8JKB6pyT05iw3vjshSjd3TaB967v6w2HTXRqWdUhNuPyDtJOFTvUWb6fuhb8VwfZ9uW3g==".toCharArray();

    /** 组织名称 操作时需要换成自己的 **/
    private static String org = "hit";

    /** 存储桶名称 **/
    private static String bucket = "example_java";

    /** InfluxDB服务的url **/
    private static String url = "http://localhost:8086/";

    public static void main(String[] args) {

        InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);
        WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking();

        // 0. 使用InflxuDB行协议写入
//        writeApiBlocking.writeRecord(WritePrecision.MS,"temperature,location=north value=50");

        // 1. 使用Point写入
//        Point point = Point.measurement("temperature")
//                .addTag("location", "west")
//                .addField("value", 38.0)
//                .time(Instant.now(),WritePrecision.NS)
//                ;
//        writeApiBlocking.writePoint(point);

        // 2. 使用POJO类写入
        DemoPOJO demoPOJO = new DemoPOJO("east", 22.2, Instant.now());
        writeApiBlocking.writeMeasurement(WritePrecision.MS,demoPOJO);

        // 3. 调用close方法会关闭并释放一些比如守护线程之类的对象。
        influxDBClient.close();
    }
}

AsyncWrite

package cn.tedu.springboot_quick.InfluxDB.V24;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteOptions;
import com.influxdb.client.domain.WritePrecision;

import java.time.Instant;

/**
 * @author: tony
 * @date: 2022/9/23 22:46
 * @description: 通过异步的方式向InfluxDB写入数据
 */
public class AsyncWrite {

    /** token 操作时需要换成自己的 **/
    private static final char[] token = "JGk2wps3dHXbxKnME8JKB6pyT05iw3vjshSjd3TaB967v6w2HTXRqWdUhNuPyDtJOFTvUWb6fuhb8VwfZ9uW3g==".toCharArray();

    /** 组织名称 操作时需要换成自己的 **/
    private static String org = "hit";

    /** 存储桶名称 **/
    private static String bucket = "example_java";

    /** InfluxDB服务的url **/
    private static String url = "http://localhost:8086/";

    public static void main(String[] args) {
        // 0.创建InfluxDB的客户端
        InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);
        // 1.异步写入会创建一个守护线程,所以在makWriteApi时可以传递一些配置项,也就是WriteOptions对象
        WriteOptions options = WriteOptions.builder()
                .batchSize(999)
                .flushInterval(10000)
                .build();
        // 2.使用makeWriteApi创建的
        WriteApi writeApi = influxDBClient.makeWriteApi(options);
        for (int i = 0; i < 999; i++) {
            DemoPOJO demoPOJO = new DemoPOJO("south", 33.3, Instant.now());
            writeApi.writeMeasurement(WritePrecision.MS,demoPOJO);
        }

        // 3.关闭连接,此方法会触发一次刷写,将缓冲区中剩下的数据向InfluxDB写入一次。
        influxDBClient.close();
    }
}

Query

package cn.tedu.springboot_quick.InfluxDB.V24;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;

import java.util.List;
import java.util.Map;

/**
 * @author: tony
 * @date: 2022/9/24 1:07
 * @description: 这是关于从InfluxDB查询数据的代码
 */
public class Query {

    /** token 操作时需要换成自己的 **/
    private static char[] token = "JGk2wps3dHXbxKnME8JKB6pyT05iw3vjshSjd3TaB967v6w2HTXRqWdUhNuPyDtJOFTvUWb6fuhb8VwfZ9uW3g==".toCharArray();

    /** 组织名称 **/
    private static String org = "hit";

    /** InfluxDB服务提供的url **/
    private static String url = "http://localhost:8086/";

    public static void main(String[] args) {

        // 0.创建InfluxDB客户端对象
        InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org);

        // 1.获取查询API对象
        QueryApi queryApi = influxDBClient.getQueryApi();

        // 2.这个flux脚本会查询test存储桶中的go_goroutines测量,这个测量下只有一个序列
        String flux = "from(bucket: \"test\")\n" +
                "  |> range(start: -1h)\n" +
                "  |> filter(fn: (r) => r[\"_measurement\"] == \"go_goroutines\")\n" +
                "  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)\n" +
                "  |> yield(name: \"mean\")";

        // 3.这个flux脚本会查询example02存储桶中的cpu测量,指定字段名称为usage_user后,
        String flux2 = "from(bucket: \"test02\")\n" +
                "  |> range(start: -1h)\n" +
                "  |> filter(fn: (r) => r[\"_measurement\"] == \"cpu\")\n" +
                "  |> filter(fn: (r) => r[\"_field\"] == \"usage_user\")\n" +
                "  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)\n" +
                "  |> yield(name: \"mean\")";

        // 4.使用query方法将会得到一个List<FluxTable>对象,其中每一个FluxTable都对应着一个序列
        List<FluxTable> query = queryApi.query(flux);

        // 5.下面这个for循环会把遍历每个序列,并将这个序列中对应的每一行数据打印出来。
        for (FluxTable fluxTable : query) {
            List<FluxRecord> records = fluxTable.getRecords();
            for (FluxRecord record : records) {
                Map<String, Object> one = record.getValues();
                System.out.println(one);
            }
        }

        // 6.下面的queryRaw方法将会得到一个字符串,字符串中是FLUX拓展的CSV格式的数据
        String data = queryApi.queryRaw(flux2);
        System.out.println(data);

    }
}

更多推荐

C++之智能指针类型转换应用总结(二百二十九)

简介:CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长!优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀人生格言:人生从来没有捷径,只有行动才是治疗恐惧和懒惰的唯一良药.更多原创,欢迎关注:Android系统攻城狮1.前言本篇目的:理解C+

CSS选择器练习小游戏

请结合CSS选择器练习小游戏进行阅读(网页的动态效果是没有办法通过静态图片展示的)网址:请点击有些题有多种答案,本文就不一一列出了第一题答案:plate第二题答案:bento第三题答案:#fancy第四题答案:plateapple或者plate>apple第五题答案:#fancypickle第六题答案:.small或者

【校招VIP】产品基础知识之学习渠道

考点介绍:产品经理是一项复合型的工作,需要学习掌握的知识很多,能否通过学习掌握并构建一套属于自己的产品知识体系,是成为一名优秀产品经理的重要条件。系统化学习方法,可以分为以下6个步骤:1、明确的学习目标。2、梳理知识框架,画出知识体系的枝干。3、各个击破,逐步学习掌握分支知识。4、在实践中应用,形成产品思维方式。5、在

WMS仓储管理系统:从集成到面向未来的策略

现代供应链需求对WMS仓储管理系统形成了一系列复杂而又严格的要求,并且面临的挑战日益增多。WMS仓储管理系统需要与不同地区、不同业务以及其他的合作伙伴进行交互,从供应商到制造商,再到运输商和客户,一切都需要与WMS系统进行交互。还有,仓库系统本身不是静态的,它即是物理的,又依赖众多的玩家和渠道,它也在不断进化以满足市场

【Java】JDK8 jvm参数配置及说明

参数说明1.堆内存参数设置-Xms或-XX:InitialHeapSize=n设置堆的初始值指令1:-Xms2g指令2:-XX:InitialHeapSize=2048m-Xmx或-XX:MaxHeapSize=n设置堆区最大值指令1:-Xmx2g指令2:-XX:MaxHeapSize=2048m-XX:NewSize

3D目标检测框架 MMDetection3D环境搭建 docker篇

本文介绍如何搭建3D目标检测框架,使用docker快速搭建MMDetection3D的开发环境,实现视觉3D目标检测、点云3D目标检测、多模态3D目标检测等等。需要大家提前安装好docker,并且docker版本>=19.03。1、下载MMDetection3D源码https://github.com/open-mml

数据结构 第二章作业 线性表 西安石油大学

在顺序表中插入和删除一个结点需平均移动多少个结点?具体的移动次数取决于哪两个因素?在顺序表中插入和删除一个结点时,平均移动的结点数量取决于两个因素:插入/删除位置和当前顺序表的长度。插入/删除位置:如果要在顺序表的开头或末尾进行插入/删除操作,不需要移动其他结点,所以移动的结点数量较少。但是,如果要在顺序表的中间位置进

代理IP和Socks5代理:跨界电商与全球爬虫的关键技术

跨界电商在全球化市场中崭露头角,而代理IP和Socks5代理则成为实现全球市场洞察和数据采集的不可或缺的工具。本文将深入探讨这两种代理技术在跨界电商、爬虫技术和出海战略中的关键作用。引言:介绍跨界电商的崛起和全球市场的机遇与挑战。引出代理IP和Socks5代理作为技术解决方案的重要性。代理IP的应用:多地区数据采集:介

转载—Linux下文件搜索、查找、查看命令

Linux下文件搜索、查找、查看命令1、最强大的搜索命令:find查找各种文件的命令2、在文件资料中查找文件:locate3、搜索命令所在的目录及别名信息:which4、搜索命令所在的目录及帮助文档路径:whereis5、在文件中搜寻字符串匹配的行并输出:grep6、分页显示一个文件或任何输出结果:more7、分页显示

SpringCLoud——Docker的基本介绍

什么是Docker项目部署问题大型项目组件较多,运行环境也较为复杂,部署时会碰到一些问题:依赖关系复杂,容易出现兼容性问题开发、测试、生产环境有差异。DockerDocker如何解决依赖的兼容问题的?将应用的LIbs(函数库)、Deps(依赖)、配置与应用一起打包将每个应用放到一个隔离容器去运行,避免互相打扰首先要了解

QT Day2

Second.h#ifndefSECOND_H#defineSECOND_H#include<QWidget>namespaceUi{classSecond;}classSecond:publicQWidget{Q_OBJECTpublicslots:voidjump_slot();//接收跳转信号的槽函数public

热文推荐