Java多线程编程-线程间协作wait/notify

2023-09-22 15:48:59

前言:

本文是基于《Java多线程编程实战指南》第五章个人理解,源码是摘抄作者的源码,源码会加上自己的理解。

等待与通知:wait/notify

单线程的变成中,如果程序需要在满足一定条件间下操作一个目标动作,就需要if语句,而在多线程中处理这种情况,保护条件可能只是暂时的,稍后其他线程可能更新了保护条件设计共享变量而使其成立,因此可以将当前线程暂停,知道保护条件得以成立时将其唤醒继续操作。伪代码如下:

//原子操作
atomic {

   while(保护条件不成立) {
        暂停当前线程;
   }

   doAction();
}

如果写成Java代码形式:

synchronized(someObject) {
    while(保护条件不成立) {
        someObject.wait();
   }
   doAction();
}

这边我个人理解,注意点有一个,就是上述操作需要具有原子性。

换个角度思考,如果上述操作不具有原子性会怎么样(即把synchronize这行注释掉),把步骤拆解:

1.等代码运行到someObject.wait()时候,会暂停当前线程,并释放所持有的someObject内部锁,线程生命周期会进入waiting状态,这时候someObject.wait()语句是不会返回,直到别的线程调用someObject.notifiy().

2.当别的线程调用someObject.notify()时候并且更新保护条件,notify会唤醒等待线程,someObject.wai()会申请someObject的内部锁,持有内部锁,该语句会返回。

3.while重新判断保护条件,但是因为操作不具有原子性,也就是在第二步到第三步过程中,有可能其他线程更改了保护条件,使得while条件再次不成立,所以重新在次进入wait语句。

4.同理doAction也是同样的道理,必须保证在执行目标动作之前,保护条件是成立状态,不然可能在执行doAction前一刻,其他线程对共享更新使得保护条件重新不成立。

因此上述while语句判断,以及doAction,以及wait调用需要放在同一个对象锁引导的临界区中。

而用Object.notify()通知,如下面伪代码:

synchronized(someObject){
    //更新等待线程的保护条件设计的共享变量
    updateShareState();

    //唤醒其他线程
    someObject.notify();
}

包含了更新共享变量和唤醒其他线程,由于一个线程只有持有一个对象的内部锁的情况下才能执行对象的notify,这也是为什么刚才说的第一步wait语句是会释放对象内部锁的,不然notify是无法调用的,详细参考下面wait内部实现伪代码。

notify还有一点需要注意就是,尽可能放在临界区结束的地方,也就是上面代码它在靠近结束的花括号之前的一句,这是因为当调用notify后,等待线程会被唤醒,但是notify本身并不会释放内部锁,所以如果它不靠近的话,等待线程可能又会拿不到内部锁,再次被暂停。

wait内部实现伪代码

Public void wait() {

    //执行线程必须持有当前对象对应的内部锁
    if(!Thread.holdsLock(this)){
        Throws new IllegalMonitorStateException();
    }

    if(当前对象不在等待集中){
    //将当前线程加入当前对象等待集中

       addToWaitSet(Thread.currentThread());
    }

    atomic{//原子操作开始
        //释放当前对象的内部锁

        releaseLock(this);
        //暂停当前线程
        block(Thread.currentThread());

    }//原子操作结束

    //再次申请当前对象的内部锁
    acquireLock(this);
    //将当前线程从当前对象的等待及中移除
    removeFromWaitSet(Thread.currentThread());
    return;//返回
}

实战案例

书里附了一个wait/notify的实战案例,其实我个人觉得这个实战案例不太好

案例是某分布式系统有个告警系统,将这些告警信息通过网络连接上报发送到告警服务器上,

AlarmAgent内部维护两个工作线程:一个工作线程负责与告警服务器建立网络连接,为网络连接线程,另个工作线程负责定时检查告警代理与告警服务器网络连接情况,为心跳线程

public class CaseRunner5_1 {
  final static AlarmAgent alarmAgent;
  static {
    alarmAgent = AlarmAgent.getInstance();
    alarmAgent.init();
  }

  public static void main(String[] args) throws InterruptedException {

    alarmAgent.sendAlarm("Database offline!");
    Tools.randomPause(12000);
    alarmAgent.sendAlarm("XXX service unreachable!");
  }
}
import java.util.Random;


public class AlarmAgent {
    // 保存该类的唯一实例
    private final static AlarmAgent INSTANCE = new AlarmAgent();
    // 是否连接上告警服务器
    private boolean connectedToServer = false;
    // 心跳线程,用于检测告警代理与告警服务器的网络连接是否正常
    private final HeartbeartThread heartbeatThread = new HeartbeartThread();

    private AlarmAgent() {
        // 什么也不做
    }

    public static AlarmAgent getInstance() {
        return INSTANCE;
    }

    public void init() {
        connectToServer();
        heartbeatThread.setDaemon(true);
        heartbeatThread.start();
    }

    private void connectToServer() {
        // 创建并启动网络连接线程,在该线程中与告警服务器建立连接
        new Thread() {
            @Override
            public void run() {
                doConnect();
            }
        }.start();
    }

    private void doConnect() {
        // 模拟实际操作耗时
        Tools.randomPause(100);
        synchronized (this) {
            connectedToServer = true;
            // 连接已经建立完毕,通知以唤醒告警发送线程
            notify();
        }
    }

    public void sendAlarm(String message) throws InterruptedException {
        synchronized (this) {
            // 使当前线程等待直到告警代理与告警服务器的连接建立完毕或者恢复
            while (!connectedToServer) {

                Debug.info("Alarm agent was not connected to server.");

                wait();
            }

            // 真正将告警消息上报到告警服务器
            doSendAlarm(message);
        }
    }

    private void doSendAlarm(String message) {
        // ...
        Debug.info("Alarm sent:%s", message);
    }

    // 心跳线程
    class HeartbeartThread extends Thread {
        @Override
        public void run() {
            try {
                // 留一定的时间给网络连接线程与告警服务器建立连接
                Thread.sleep(1000);
                while (true) {
                    if (checkConnection()) {
                        connectedToServer = true;
                    } else {
                        connectedToServer = false;
                        Debug.info("Alarm agent was disconnected from server.");

                        // 检测到连接中断,重新建立连接
                        connectToServer();
                    }
                    Thread.sleep(2000);
                }
            } catch (InterruptedException e) {
                // 什么也不做;
            }
        }

        // 检测与告警服务器的网络连接情况
        private boolean checkConnection() {
            boolean isConnected = true;
            final Random random = new Random();

            // 模拟随机性的网络断链
            int rand = random.nextInt(1000);
            if (rand <= 500) {
                isConnected = false;
            }
            return isConnected;
        }
    }
}
import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.Date;

public class Debug {
    private static ThreadLocal<SimpleDateFormat> sdfWrapper = new ThreadLocal<SimpleDateFormat>() {
        @Override
        protected SimpleDateFormat initialValue() {
            return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        }

    };

    enum Label {
        INFO("INFO"),
        ERR("ERROR");
        String name;

        Label(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }

    // public static void info(String message) {
    // printf(Label.INFO, "%s", message);
    // }

    public static void info(String format, Object... args) {
        printf(Label.INFO, format, args);
    }

    public static void info(boolean message) {
        info("%s", message);
    }

    public static void info(int message) {
        info("%d", message);
    }

    public static void error(String message, Object... args) {
        printf(Label.ERR, message, args);
    }

    public static void printf(Label label, String format, Object... args) {
        SimpleDateFormat sdf = sdfWrapper.get();
        @SuppressWarnings("resource")
        final PrintStream ps = label == Label.INFO ? System.out : System.err;
        ps.printf('[' + sdf.format(new Date()) + "][" + label.getName()
                + "]["
                + Thread.currentThread().getName() + "]:" + format + " %n", args);
    }
}
import sun.misc.Unsafe;

import java.io.*;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class Tools {
    private static final Random rnd = new Random();
    private static final Logger LOGGER = Logger.getAnonymousLogger();

    public static void startAndWaitTerminated(Thread... threads)
            throws InterruptedException {
        if (null == threads) {
            throw new IllegalArgumentException("threads is null!");
        }
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
    }

    public static void startThread(Thread... threads) {
        if (null == threads) {
            throw new IllegalArgumentException("threads is null!");
        }
        for (Thread t : threads) {
            t.start();
        }
    }

    public static void startAndWaitTerminated(Iterable<Thread> threads)
            throws InterruptedException {
        if (null == threads) {
            throw new IllegalArgumentException("threads is null!");
        }
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
    }

    public static void randomPause(int maxPauseTime) {
        int sleepTime = rnd.nextInt(maxPauseTime);
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void randomPause(int maxPauseTime, int minPauseTime) {
        int sleepTime = maxPauseTime == minPauseTime ? minPauseTime : rnd
                .nextInt(maxPauseTime - minPauseTime) + minPauseTime;
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static Unsafe getUnsafe() {
        try {
            Field f = Unsafe.class.getDeclaredField("theUnsafe");
            ((Field) f).setAccessible(true);
            return (Unsafe) f.get(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void silentClose(Closeable... closeable) {
        if (null == closeable) {
            return;
        }
        for (Closeable c : closeable) {
            if (null == c) {
                continue;
            }
            try {
                c.close();
            } catch (Exception ignored) {
            }
        }
    }

    public static void split(String str, String[] result, char delimeter) {
        int partsCount = result.length;
        int posOfDelimeter;
        int fromIndex = 0;
        String recordField;
        int i = 0;
        while (i < partsCount) {
            posOfDelimeter = str.indexOf(delimeter, fromIndex);
            if (-1 == posOfDelimeter) {
                recordField = str.substring(fromIndex);
                result[i] = recordField;
                break;
            }
            recordField = str.substring(fromIndex, posOfDelimeter);
            result[i] = recordField;
            i++;
            fromIndex = posOfDelimeter + 1;
        }
    }

    public static void log(String message) {
        LOGGER.log(Level.INFO, message);
    }

    public static String md5sum(final InputStream in) throws NoSuchAlgorithmException, IOException {
        MessageDigest md = MessageDigest.getInstance("MD5");
        byte[] buf = new byte[1024];
        try (DigestInputStream dis = new DigestInputStream(in, md)) {
            while (-1 != dis.read(buf))
                ;
        }
        byte[] digest = md.digest();
        BigInteger bigInt = new BigInteger(1, digest);
        String checkSum = bigInt.toString(16);

        while (checkSum.length() < 32) {
            checkSum = "0" + checkSum;
        }
        return checkSum;
    }

    public static String md5sum(final File file) throws NoSuchAlgorithmException, IOException {
        return md5sum(new BufferedInputStream(new FileInputStream(file)));
    }

    public static String md5sum(String str) throws NoSuchAlgorithmException, IOException {
        ByteArrayInputStream in = new ByteArrayInputStream(str.getBytes("UTF-8"));
        return md5sum(in);
    }

    public static void delayedAction(String prompt, Runnable action, int delay/* seconds */) {
        Debug.info("%s in %d seconds.", prompt, delay);
        try {
            Thread.sleep(delay * 1000);
        } catch (InterruptedException ignored) {
        }
        action.run();
    }

    public static Object newInstanceOf(String className) throws InstantiationException,
            IllegalAccessException, ClassNotFoundException {
        return Class.forName(className).newInstance();
    }

}

参考文献

《Java多线程编程实战指南-核心篇》

更多推荐

一台PoE交换机可以为多少个设备提供供电?

如今在安防监控领域,许多网络设备都支持PoE供电。在网络监控工程中,为了节省布线成本并提高便捷性,大多数工程商选择使用PoE供电方案,也就是使用PoE交换机为监控摄像头提供电力。那么,一台功率输出以太网(PoE)工业交换机能够为多少个设备提供电力呢?1、PoE交换机供电标准对于初次接触PoE供电的朋友来说,可能对PoE

使用Selenium进行网页登录和会话管理

随着互联网的快速发展,网页登录和会话管理是许多网站和应用程序的基本功能。通过网页登录,用户可以访问个人账户、购物车订单、历史记录等个性化信息。为了提高用户体验和效率,自动化登录和会话管理成为一个重要的需求。而Selenium作为一种强大的Web自动化工具,为开发人员提供了便捷的方式来实现这些功能。例如我们在访问京东网站

LC926. 将字符串翻转到单调递增(JAVA - 动态规划)

将字符串翻转到单调递增题目描述动态规划题目描述难度-中等LC926.将字符串翻转到单调递增(JAVA-动态规划)如果一个二进制字符串,是以一些0(可能没有0)后面跟着一些1(也可能没有1)的形式组成的,那么该字符串是单调递增的。给你一个二进制字符串s,你可以将任何0翻转为1或者将1翻转为0。返回使s单调递增的最小翻转次

@SpringBootApplication注解说明(InsCode AI 创作助手)

@SpringBootApplication是SpringBoot中的一个关键注解,用于标识一个类是SpringBoot应用程序的主应用程序类。在这篇文章中,我们将详细解释@SpringBootApplication注解以及它在SpringBoot应用程序中的作用。@SpringBootApplication注解的作用

Docker容器数据持久化存储机制

这里写目录标题一、Docker容器数据持久化存储介绍二、Docker容器数据持久化存储方式三、Docker容器数据持久化存储方式应用案例演示3.1dockerrun-v3.1.1创建了本地目录3.1.2未创建本地目录3.2volumes3.2.1创建数据卷3.2.2使用数据卷一、Docker容器数据持久化存储介绍物理机

记录一次久远git仓库迁移

因为项目需求,公司收购了一个第三方的项目,包含源码。需要将该项目上传到我们公司自己的gitlab上。没过脑子一顿操作://查询远程链接gitremote-v//删除原有链接gitremotermorigin//如果需要修改分支名gitbranch-moldnamenewname//修改为自己的gitlab地址gitre

美创科技参编《数字政府建设与发展研究报告(2023)》 正式发布

9月14日,中国信息通信研究院云计算与大数据研究所牵头编制的《数字政府建设与发展研究报告(2023)》正式发布。美创科技结合在政务数据安全领域的丰富实践经验,参与报告编写。《数字政府建设与发展研究报告》以“技术、业务、数据融合发展路径探索”为主题,剖析当前数字政府建设时代内涵,梳理当前数字政府建设与发展现状,从技术、业

算法基础之二分查找

原题链接一、二分查找中的mid+1和mid-1的问题二分查找中的边界问题处理不好很容易导致死循环和计算错误的问题,以题目数的范围为例。题目大意​二分查找重复数第一次出现的位置和最后一次出现的位置。数学含义​第一次位置即找到一个长度最大的>=X区间的左边界​最后一次位置即找到一个长度最大的>=X区间的右边界注意找的目标是

大数据——Spark SQL

1、SparkSQL是什么SparkSQL是Spark中用于处理结构化数据的一个模块,前身是Shark,但本身继承了前身Hive兼容和内存列存储的一些优点。SparkSQL具有以下四个特点:综合性(Integrated):Spark中可以加入SQL查询,也可以使用DataFrameAPI,其中API提供了多种语言选择,

[C++随笔录] vector模拟实现

vector模拟实现基本结构天选之子构造拷贝构造析构operator=空间reserveresizesize&&capacity增insertpush_back删erasepop_back查&&改swapoperator[]源码基本结构//可以是不同类型,用类模板template<classT>classvector{

产品解读 | 分布式多模数据库:KaiwuDB

1.KaiwuDB是什么?KaiwuDB是由浪潮创新研发的一款分布式、多模融合,支持原生AI的数据库产品,拥有“就地计算”等核心技术,具备高速写入、极速查询、SQL支持、随需压缩、智能预计算、订阅发布、集群部署等特性,具有稳定安全、高可用、易运维等特点。2.KaiwuDB设计理念在当今数据爆炸的时代,企业和组织面临着如

热文推荐