有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java start simple 1生产者2通过executor服务消费

public class SemActionPlace {

    public SemMonitor StartConsumerProducer() {
        SemMonitor monitor = new SemMonitor();
        List<Thread> threads = new LinkedList<>();
        Thread p1 = new Thread(new Producer(monitor), "P1");
        p1.start();
        Thread c1 = new Thread(new Consumer(monitor), "C-odd");
        c1.start();
        Thread c2 = new Thread(new Consumer(monitor), "C-even");
        c2.start();
        threads.add(p1);
        threads.add(c1);
        threads.add(c2);
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return monitor;
    }
}

当我通过start()-join()启动线程时,代码工作正常,但是,当我尝试通过executor服务执行相同的操作时,我没有发现错误。保存线程和相互监视器的名称对我来说很重要。请告诉我如何通过executor服务执行线程

下面的代码无法正常工作。错误在哪里

public SemMonitor StartConsumerProducer() {
    SemMonitor monitor = new SemMonitor();
    Thread p1 = new Thread(new Producer(monitor), "P1");
    Thread c1 = new Thread(new Consumer(monitor), "C-odd");
    Thread c2 = new Thread(new Consumer(monitor), "C-even");
    ThreadPoolExecutor service = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
    service.execute(p1);
    service.execute(c1);
    service.execute(c2);
    System.out.println(service.getCompletedTaskCount());
    System.out.println(service.getCompletedTaskCount());
    return monitor;
}

我需要从executor服务器得到一件简单的事情,就是我希望它像simple start()-join()解决方案一样工作(第一段代码)

class Consumer implements Runnable {

    private final SemMonitor monitor;

    Consumer(SemMonitor monitor) {
        this.monitor = monitor;
    }

    @Override
    public void run() {
        long t = System.currentTimeMillis();
        long end = t + 1000;
        while (System.currentTimeMillis() < end) {
            consoleLog(monitor.activeThreadName,false);
            if (/*monitor.semaphore.tryAcquire() && */monitor.activeThreadName.equals( Thread.currentThread().getName())) {

                try {
                    consoleLog(String.valueOf(Thread.currentThread().getName() + " was notified "),monitor.enableLog);
                    monitor.semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                monitor.get(Thread.currentThread().getName());
            }
            try{
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        }

    }
}

class Producer implements Runnable {

    private SemMonitor monitor;

    Producer(SemMonitor monitor) {
        this.monitor = monitor;
    }

    @Override
    public void run() {
        String threadNameToWork;
        Integer randNum;
        long t = System.currentTimeMillis();
        long end = t + 500;
        while (System.currentTimeMillis() < end) {
            if (monitor.semaphore.tryAcquire()) {
                randNum = ((Number) (random() * 100)).intValue();
                if (randNum % 2 == 0) {
                    threadNameToWork = "C-even";
                } else {
                    threadNameToWork = "C-odd";
                }
                try {
                    monitor.putItem(randNum, Thread.currentThread().getName(), threadNameToWork);
                    Thread.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

class Monitor {

    private double currItem;
    private boolean isConsumersShouldWaitProducer = true;
    private boolean isConsuming = false;
    private String threadNameToWork;

    synchronized void putRandNumber(double producerOutput, String producerName, String threadNameToWork) {
        if (isConsumersShouldWaitProducer) {
            System.out.println("Consumers wait for new Production");
        }
        this.threadNameToWork = threadNameToWork;
        currItem = producerOutput;
        System.out.println("Producer " + producerName + " putRandNumber Item: " + currItem);
        if (currItem > 3) {
            notifyAll();
            isConsumersShouldWaitProducer = false;
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    synchronized void consumeRandNumber(String threadName) {
        if (isConsumersShouldWaitProducer) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                System.out.println("Caught Interrupted Exception while waiting to consume currItem: " + e.getMessage());
            }
        }
        if (isConsuming) {
            try {
                this.wait();
                isConsuming = true;
            } catch (InterruptedException e) {
                System.out.println("Caught Interrupted Exception while waiting to consume currItem: " + e.getMessage());
            }
        }
        switch (Thread.currentThread().getName()) {
        /*switch (threadNameToWork) {*/
            case "C-odd":
                isConsuming = true;
                if (currItem % 2 != 0 && threadNameToWork.equals(Thread.currentThread().getName())) {
                    consumeItems(threadName);
                }
                isConsuming = false;
                notifyAll();
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                break;
            case "C-even":
                isConsuming = true;
                if (currItem % 2 == 0 && threadNameToWork.equals(Thread.currentThread().getName())) {
                    consumeItems(threadName);
                }
                isConsuming = false;
                notifyAll();
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                break;
            default:
                break;
        }
    }

    private synchronized void consumeItems(String threadName) {
        isConsumersShouldWaitProducer = true;
        String randNumType = "*odd/even*";
        System.out.println("Consumer:" + threadName + " consumed " + randNumType + " Items = " + currItem);
        notifyAll();
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

共 (1) 个答案

  1. # 1 楼答案

    你想用线程名做点什么,对吗?在“使用新线程”中创建的线程名不会传递到ExecutorService,但会

    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                                           .setNameFormat("thread-%d").build()
    

    然后

    ExecutorService exec = Executors.newSingleThreadExecutor(namedThreadFactory);
    

    现在有了名为thread-1thread-2的线程

    或者在run()方法中设置线程名称

    Thread.currentThread().setName(myName)
    

    要确保线程已完成,请在返回监视器之前添加此线程

    service.shutdown();
    while (!service.awaitTermination(10, TimeUnit.SECONDS)) {
       log.info("Awaiting completion of threads.");
    }