有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java线程相互阻塞

我收到了文件下载请求,每次下载都在不同的线程中进行,直到超过池大小。下载完成后,处理器处理下载的项目。下载任务不会同时运行。除了任务的同步部分之外,还有什么原因会导致这种情况。getDownloadTask()。下载()或任务。getProcessTask()。进程(即使没有同步部件)

下载任务处理器

public class DownloadTaskEnqueuer {
    private static final BlockingQueue<Task> downloadQueue = new LinkedBlockingQueue<>();
    private static final BlockingQueue<Task> processQueue = new LinkedBlockingQueue<>();
    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public void offer(Task task) {
        return downloadQueue.offer(task);
    }

    public void createPool(int size) {
        for (int i = 0; i < size; i++) {
            executor.execute(new DownloadTask(downloadQueue, processQueue);
            executor.execute(new ProcessTask(processQueue));
        }
    }
}

下载任务

public class DownloadTask implements Runnable {
    private BlockingQueue<Task> downloadQueue;
    private BlockingQueue<Task> processQueue;
    
    // constructor for initing two queue

    public void offer(Task task) {
        return processQueue.offer(task);
    }
    
    @Override
    public void run() {
        while (true) {
           Task task = downloadQueue.poll();
           if (task != null) {
               task.getDownloadTask().download();
               offer(task);    
           } else {
               // sleep 250 ms 
           }
        }
    }
}

处理任务

public class ProcessTask implements Runnable {
    private BlockingQueue<Task> processQueue;
    
    // constructor for initing queue
    
    @Override
    public void run() {
        while (true) {
           Task task = processQueue.poll();
           if (task != null) {
                task.getProcessTask().process();
           } else {
               // sleep 250 ms 
           }
        }
    }
}

用例(伪)

createPool(10);

listener.listen((task) -> {
    downloadTaskEnqueuer.offer(task);
}

共 (1) 个答案

  1. # 1 楼答案

    有一些设计错误。每次你写线程时。sleep()比您实现的忙等待模式更有效。不建议这样做。 试试这个:

    public class DownloadTaskEnqueuer implements Runnable {
    private final BlockingQueue<Task>           queue       = new LinkedBlockingQueue<>();
    private final LinkedList<Future<Object>>    results     = new LinkedList<>();
    private final Semaphore                     lock        = new Semaphore(0);
    boolean                                     isRunning   = true;
    private final ExecutorService               executor;
    
    public DownloadTaskEnqueuer(int parallismCount) {
        executor = Executors.newFixedThreadPool(parallismCount);
    }
    
    public void offer(Task task) {
        queue.offer(task);
        lock.release();
    }
    
    @Override
    public void run() {
        while (isRunning) {
            lock.acquireUninterruptibly();
            Task task = queue.poll();
            Future<Object> futureResult = executor.submit(task);
            results.add(futureResult);
        }
    
    }
    
    public static void main(String[] args) {
        DownloadTaskEnqueuer dte = new DownloadTaskEnqueuer(10);
        Thread t = new Thread(dte);
        t.start();
    }
    
    
    public abstract class Task implements Callable<Object> {
    
    abstract Object download();
    
    abstract Object doProcess(Object downloadedObject);
    
    @Override
    public Object call() throws Exception {
        Object downloadedObject = download();
        Object processingResult = doProcess(downloadedObject);
        return processingResult;
    }
    

    }