有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

面向协作流消费者的java设计思想?

假设出现以下情况:

我们有一个来自某个库的Java类,它消耗字节流,比如某个XML解析器XmlParser1,它公开了一个方法xmlParser1.parse(inputStream);这种方法通常会在一次调用中吃掉所有字节,最终导致阻塞。 我们还有另一个来自其他库的类,它做了类似的事情,使用不同的实现:XmlParser2with xmlParser2.parse(inputStream)。 现在,我们想用两个解析器解析一个流

我的第一个答案是:我们完蛋了。由于我们无法控制每个类如何使用流,所以我们所能做的就是缓冲内存中的所有字节或将其存储到临时文件中(或者打开/重新打开它,如果可能的话)。这些消费者的API本质上是不合作的

现在,假设我们对XmlParser1(实现和签名)有控制权,我们希望以更灵活、更合作的方式对其进行编码,以便调用方能够以某种合理、高效的方式实现上述行为。。。你有什么建议

我正在考虑的一些替代方案:

1)使XmlParser1实现FilterInputStream,这样当某个类(XmlParser1)试图从中读取一些字节时,它会在内部解析它必须的内容(迭代地,可能带有一些合理的缓冲),并返回原始字节。(我想说,这与FilterInputStream的概念并不完全对应)。通过这种方式,客户端代码可以简单地链接解析器:

   public class XmlParser1 extends FilterInputStream {
       public XmlParser1(InputStream rawInputStream) { ... } 
       public int read(byte[] b, int off, int l) throws IOException {
           // this would invoke the underlying stream read, parse internall the read bytes,
           // and leave them in the buffer
       }
   }

   XmlParser1 parser1 = new XmlParser1(inputstream);
   XmlParser2 parser2 = new XmlParser2(parse); 
   parser2.parse(); // parser2 consumes all the input stream, which causes parser1 to read an parse it too

2)不要把atXmlParser1看作字节的消费者,而是把它看作接收器:我们不会让它吃掉字节,我们会用勺子喂它。所以,我们可以用 xmlParser1.write(byte[])。。。也就是说,我们将使其成为OutputStream,而不是传递一个InputStream。这将允许客户端创建一个TeeInputStream,将字节透明地传递给XmlParser2类,同时调用XmlParser1.write()

请注意,我们在任何情况下都不需要单独的线程

我不确定哪一个(如果有的话)在概念上更可取,也不确定是否有更好的替代方案。在我看来,这是一个应该已经讨论过的设计问题,但我没有发现太多——不一定局限于Java。欢迎提出意见和参考


共 (2) 个答案

  1. # 1 楼答案

    假设这两个解析器在两个独立的线程中运行,它可能是这样的(不是工作代码)

    public class Test extends FilterInputStream {
        byte[] buf = new byte[8192];
        int len;
        Thread thread = null;
    
        @Override
        public synchronized int read(byte[] b, int off, int l) throws IOException {
            while (thread == Thread.currentThread() && len > 0) {
                thread.wait();
            }
            if (len > 0) {
                System.arraycopy(buf, 0, b, off, l);
                len = 0;
                return l;
            }
            len = super.read(b, off, l);
            System.arraycopy(b, off, buf, 0, len);
            thread = Thread.currentThread();
            notify();
            return len;
        }
    

    也就是说,#1读取字节并将其保存在buf中,#1的下一次尝试将被阻止,直到#2从缓冲区读取所有字节

  2. # 2 楼答案

    如果线程在同一台服务器上,那么分割输入流的想法就没有任何意义。因为仍然只使用一个InputStream和一个BufferedInputStream来获取数据,所以从InputStreams中创建对象,然后在两个不同的运行线程中使用这些对象。结论:在Java中,任何时候都没有必要阻止任何InputStream<我甚至认为它是有害的,因为如果你阻塞了,如果你的缓冲或管道流动,会发生什么?队列溢出

    编辑:如果你想停止一个流,你需要告诉发送者不要再发送任何数据。或者你像youtube那样做,他们将一个视频分成几个部分(即1部分1分钟),然后一次只预加载这些部分,因此停止视频不会影响预加载,因为只有当你到达时间线中的某个位置(比如45秒,1分45秒,2分45秒,aso)时,它才会加载。嗯,这实际上只是一种预加载技术,没有真正的流媒体,这就是为什么Youtube不需要处理数据包丢失。)

    不过,我还有几行伪代码要给你,客户机:

    BufferedOutputStream bos = new BufferedOutputStream(/*yourBasicInputStream*/);
    ObjectOutputStream oos = new ObjectOutputStream(bos);  //Or use another wrapper
    oos.writeObject(yourObjectToSend);      //Or use another parser: Look into the API: ObjectInputStream
    

    主线程控制器(又名服务器)中的类变量:

    Thread thread1;  //e.g. a GUI controller
    Thread thread2;  //e.g. a DB controller
    

    服务器(或服务器启动的另一个服务器线程,以两个线程作为参数):

    BufferedInputStream bis = new BufferedInputStream(/*yourBasicInputStream*/);
    ObjectInputStream ois = new ObjectInputStream(bis);   //Or use another wrapper
    //now we use an interface MyNetObject implementing the method getTarget(), but
    //also an abstract class would be possible (with a more complex getTarget-method):
    MyNetObject netObject = (MyNetObject) ois.readObject();   //Or use another parser...
    if(netObject.getTarget()=="Thread1ClassANDThread2Class"){
        thread1.activateSync(netObject);        //notify...  
        thread2.activateSync(netObject);        //...both threads!
    }
    else if(netObject.getTarget()=="Thread1Class"){
        thread1.activate(netObject);        //give thread1 a notification
    }
    else if(netObject.getTarget()=="Thread2Class"){
        thread2.activate(netObject);        //give thread2 a notification
    }
    else {//do something else...}
    

    不要忘记同步“activateSync(netObject)”-方法,但前提是要对对象进行任何更改(不需要同步读取,只需要同步写入):

    public void activateSync(MyNetObject netObject){
        synchronize(netObject){
            //do whatever you wanna do with the object...the other thread will always get the actual object!
        }
    }
    

    这很简单,快速,一致。。。完全面向对象。希望你能明白我的意思

    更新:

    理解流或读取器实际上也是“解析器”是很重要的。有一个重要的区别:流(通常)是网络驱动的类,用于写入和读取任何类型的数据——字符除外。而reader可用于阅读任何类型的文本/字符。因此,正确的实现应该是这样的:读取带有一些流的传入数据包,然后将数据存储到适当的对象中。然后你就有了一个通用对象,你可以在任何类型的阅读器中使用它。如果您只有一张图片要读,可以在类ObjectInputStreamhttp://docs.oracle.com/javase/1.4.2/docs/api/java/io/ObjectInputStream.html)中尝试解析器readUTF(),它会生成一个字符串:

    BufferedInputStream bis = new BufferedInputStream(/*yourBasicInputStream*/);
    ObjectInputStream ois = new ObjectInputStream(bis);
    String string = ois.readUTF();   //Or another usable parser/method
    XmlParser1.read(string);      //for reads there is...
    XmlParser2.read(string);      //...no synchronisation needed!
    

    现在,唯一剩下的就是教解析器如何读取该字符串。而对象字符串本身可以被视为“接收器”。如果这不适用于您,只需找到另一个解析器/方法来创建“sink”对象

    请注意,这里讨论的解决方案——使用ObjectInputStream类和适当的解析器——在许多情况下都能工作,也适用于大数据(然后在通过网络发送之前,只需将一个1GB的文件切成几个字符串/对象“包”,就像torrents一样)。但它不适用于视频/音频流,在视频/音频流中,可能会有数据包丢失,并且无论如何都需要完全不同的解决方案(这本身就是一门科学:http://www.google.ch/search?q=video+stream+packet+drop