有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何创建相位器链/层

我正在编写多线程应用程序,它使用Phaser来知道何时完成工作。问题是,在ExecutorCompletionService中,一个队列中甚至可能有100k个线程,但Phaser中未驱动方的最大数量是65535。当65536派对到来时,我能做什么

我的示例代码:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws Exception {
    ExecutorService ec = Executors.newFixedThreadPool(10);
    ExecutorCompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(
            ec);
    Phaser phaser = new Phaser();

    // register first node/thread
    ecs.submit(new SimpleParser("startfile.txt"));
    phaser.register();

    Future<List<String>> future;
    do {
        future = ecs.poll();
        if(future!=null && future.get() != null) {
            addParties(phaser, future.get(), ecs);
            phaser.arriveAndDeregister();
        }

        if (phaser.isTerminated()) {
            ec.shutdown();
        }
    } while (!ec.isShutdown() && !phaser.isTerminated());
}

public static void addParties(Phaser p, List<String> filenames,
        ExecutorCompletionService<List<String>> ecs) {
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        p.register();
    }
}

static class SimpleParser implements Callable<List<String>> {

    String fileName;

    public SimpleParser(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public List<String> call() throws Exception {
        return parseFile();
    }

    private List<String> parseFile() {
        return new ArrayList<String>(Arrays.asList(new String[] {
                "somefilename1.txt", "somefilename2.txt" }));
    }

}
}

问题出在addParties()方法中。单线程(SimpleParser)可以返回100个新文件名,将有100个新线程提交给ExecutorCompletionService,以及在Phaser中注册的100个新参与方。 我试着用这样的方法:

if(p.getUnarrivedParties() == 65535)
            p = new Phaser(p);

创建一个相量链,但没有帮助,因为p.getUnarrivedParties()返回0,但我无法注册它的下一方

    System.out.println(p.getUnarrivedParties());
        if(p.getUnarrivedParties() == 65535) {
            p = new Phaser(p);
            System.out.println(p.getUnarrivedParties());
        }
        p.register();

印刷品:

65535

0

并抛出非法的例外

那么,我如何创建新的相量器,它将与旧的相量器连接

//编辑

谢谢@bowmore。 我还有两个问题

让我们来看一个例子:

import java.util.concurrent.Phaser;

public class Test2 {
    public static void main(String[] args) {
        Phaser parent = new Phaser();
        Phaser child1 = new Phaser(parent);
        Phaser child2 = new Phaser(parent);
        child1.register();
        child2.register();

        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child1.isTerminated()+"\n");

        child1.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");

        child2.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");
    }
}

它打印:

Parent: false
Child1: false
Child2: false

Parent: false
Child1: false
Child2: false

Parent: true
Child1: true
Child2: true

为什么在孩子之后。到达并注销();child1没有终止,如何检查它是否真的终止了

第二个问题。 在达到65535个派对后,我问起创建新的Phaser的问题,因为我认为创建数千个新对象是无用的——你认为这样做不会出现内存问题,或者甚至可能提高性能


共 (0) 个答案