有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java每次向另一个方法发送大约固定大小的字节数组

我有一个方法,它接受一个参数Partitionenum。在同一时间段内,多个后台线程(最多15个)将通过传递不同的partition值来调用此方法。这里dataHoldersByPartitionPartitionConcurrentLinkedQueue<DataHolder>ImmutableMap

  private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;

  //... some code to populate entry in `dataHoldersByPartition` map

  private void validateAndSend(final Partition partition) {  
    ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
    Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;      
    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll())  != null) {      
      byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
      if (clientKeyBytes.length > 255)
        continue;

      byte[] processBytes = dataHolder.getProcessBytes();
      int clientKeyLength = clientKeyBytes.length;
      int processBytesLength = processBytes.length;

      int additionalLength = clientKeyLength + processBytesLength;
      if (totalSize + additionalLength > 50000) {
        Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());
        clientKeyBytesAndProcessBytesHolder = new HashMap<>();
        totalSize = 0;
      }
      clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
      totalSize += additionalLength;
    }
    // calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
    if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
        Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());      
    }
  }

下面是我的Message课程:

public final class Message {
  private final byte dataCenter;
  private final byte recordVersion;
  private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;

  public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) {
    this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  // Output of this method should always be less than 50k always
  public byte[] serialize() {
    // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;
    int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder);

    ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
    // header layout
    byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
        .putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
        .put(recordsPartition).put(replicated);

    // data layout
    for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
      byte keyType = 0;
      byte[] key = entry.getKey();
      byte[] value = entry.getValue();
      byte keyLength = (byte) key.length;
      short valueLength = (short) value.length;

      ByteBuffer dataBuffer = ByteBuffer.wrap(value);
      long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();

      byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
          .put(value);
    }
    return byteBuffer.array();
  }

  private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
    int size = 36;
    for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
      size += 1 + 1 + 8 + 2;
      size += entry.getKey().length;
      size += entry.getValue().length;
    }
    return size;
  }

    // getters and to string method here
}

基本上,我必须确保的是,每当在validateAndSend方法中调用sendToDatabase方法时,message.serialize()字节数组的大小无论如何都应该小于50k。我的sendToDatabase方法发送来自serialize方法的字节数组。例如,如果我在dataHoldersCLQ中有60k条记录,那么我将在validateAndSend方法中发送两个数据块:

  • 首先,我将创建一个大约小于50k的字节数组(意味着从message.serialize()出来的字节数组小于50k)并对其调用sendToDatabase方法
  • 其次,我将调用sendToDatabase方法来重新定义部分

为了完成上述任务,我在validateAndSend方法中使用了totalSize变量,该方法试图测量50k大小,但看起来我的方法可能不正确,我可能会丢弃一些记录或每次发送超过50k

看起来我的Message类知道clientKeyBytesAndProcessBytesHolder映射,我可以使用这个映射通过调用getBufferCapacity方法来精确定义大小,如果大约小于50k,那么调用sendToDatabase方法


共 (2) 个答案

  1. # 1 楼答案

    因此,这里是我的尝试(问题可能最好向代码审查社区提出,但无论如何)。它依赖于对Message的一些设计更改,因此它更像一个Builder模式。缓冲区成为消息的一部分。它的占用是通过对BufferOverflowException异常进行反应来控制的。一旦发生,缓冲区回滚到上次成功添加的结果,将分配新消息,并重试尝试添加相同的数据段。缓冲区完成后,记录总数和总大小将写入标头,整个缓冲区将转储到字节数组中(我可能会尝试避免这种额外的转换,并直接在sendToDatabase中对缓冲区进行操作,但这超出了目前的范围):

    // TODO: structure has been adjusted for testing purposes
    enum Partition
    {
        A(0x1);
    
        private final int _partition;
    
        int getPartition()
        {
            return _partition;
        }
    
        Partition(final int partition)
        {
            _partition = partition;
        }
    }
    
    // TODO: structure has been adjusted for testing purposes
    final static class DataHolder
    {
        private final String _clientKey;
        private final byte[] _processBytes;
    
        public DataHolder(
            final String clientKey,
            final String value)
        {
            _clientKey = clientKey;
            byte[] valueBytes = value.getBytes();
            // simulate payload including extra bytes for the header
            final ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + valueBytes.length)
                                                .order(ByteOrder.BIG_ENDIAN);
            buffer.putInt(0).putLong(System.currentTimeMillis()).put(valueBytes);
            _processBytes = readToBytes(buffer);
        }
    
        String getClientKey()
        {
            return _clientKey;
        }
    
        byte[] getProcessBytes()
        {
            return _processBytes;
        }
    }
    
    // API has been changed to something more like the Builder pattern
    final static class Message
    {
        private final long address;
        private final long addressFrom;
        private final long addressOrigin;
        private final byte recordsPartition;
        private final byte replicated;
        private final ByteBuffer buffer;
        private final int writeStatsPosition;
        private int payloadCount;
    
        Message(Partition recordPartition, int sizeLimit)
        {
            this.recordsPartition = (byte) recordPartition.getPartition();
            this.replicated = 0;
            // TODO: temporarily replaced with a hard-coded constant
            long packedAddress = 123456789L;
            this.address = packedAddress;
            this.addressFrom = 0L;
            this.addressOrigin = packedAddress;
            buffer = ByteBuffer.allocate(sizeLimit).order(ByteOrder.BIG_ENDIAN);
            // TODO: temporarily replaced with a hard-coded constant
            byte dataCenter = 0x1;
            byte recordVersion = 1;
            buffer.put(dataCenter).put(recordVersion);
            writeStatsPosition = buffer.position();
            buffer.putInt(datacenter).putInt(recordVersion);
            buffer.putLong(address).putLong(addressFrom).putLong(addressOrigin)
                      .put(recordsPartition).put(replicated);
        }
    
        /**
         * Tries to add another pair of client key and process bytes to
         * the current message. Returns true if successfully added, false -
         * if the data cannot be accommodated due to message binary size limit.
         */
        boolean add(byte[] key, byte[] value)
        {
            try
            {
                byte keyType = 0;
                byte keyLength = (byte) key.length;
                short valueLength = (short) value.length;
                ByteBuffer valueAsBuffer = ByteBuffer.wrap(value);
                long timestamp = valueAsBuffer.capacity() > 10 ? valueAsBuffer.getLong(2) : System.currentTimeMillis();
                payloadCount++;
                // remember position in the buffer to roll back to in case of overflow
                buffer.mark();
                buffer.put(keyType).put(keyLength).put(key);
                buffer.putLong(timestamp).putShort(valueLength).put(value);
    
                return true;
            }
            catch (BufferOverflowException e)
            {
                payloadCount ;
                buffer.reset();
                return false;
            }
        }
    
        byte[] serialize()
        {
            int finalPosition = buffer.position();
            // adjust the message header with the totals
            buffer.putInt(writeStatsPosition, payloadCount)
                  .putInt(writeStatsPosition + 4, finalPosition);
            return readToBytes(buffer);
        }
    }
    
    static void validateAndSend(final Partition partition, final Supplier<Message> messageFactory)
        throws InterruptedException
    {
        final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
        Message message = messageFactory.get();
        DataHolder dataHolder;
        while ((dataHolder = dataHolders.poll()) != null)
        {
            final byte[] keyBytes = dataHolder.getClientKey()
                                        .getBytes(StandardCharsets.UTF_8);
            final int keyLength = keyBytes.length;
            if (keyLength > 255)
            {
                continue;
            }
    
            while (!message.add(keyBytes, dataHolder.getProcessBytes()))
            {
                // TODO: consider proper handling of the case when the buffer size is too small to accept even a single pair
                Preconditions.checkState(message.payloadCount > 0,
                    "buffer size too small to accommodate payload");
                final byte[] serializedMessage = message.serialize();
                // TODO: makes sense to introduce a message consumer interface and call it here instead of sendToDatabase() - simplifies testing
                sendToDatabase(message.address, serializedMessage);
                message = messageFactory.get();
            }
        }
        if (message.payloadCount > 0)
        {
            byte[] serializedMessage = message.serialize();
            sendToDatabase(message.address, serializedMessage);
        }
    }
    
    static void sendToDatabase(long address, byte[] serializedMessage)
    {
        // TODO: added simulating activity
        System.out.printf("Sending %d bytes to %d: %s%n",
            serializedMessage.length, address, DatatypeConverter.printHexBinary(serializedMessage));
    }
    
    static byte[] readToBytes(ByteBuffer buffer)
    {
        buffer.flip();
        byte[] bytes = new byte[buffer.remaining()];
        buffer.get(bytes);
        return bytes;
    }
    
    public static void main(String[] args)
        throws ExecutionException, InterruptedException
    {
        // TODO: using small value for testing - must be set to 50K in real case
        final int maxMessageSize = 80;
        final Supplier<Message> messageFactory = new Supplier<Message>()
        {
            @Override
            public Message get()
            {
                return new Message(Partition.A, maxMessageSize);
            }
        };
    
        final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(Partition.A);
        dataHolders.add(new DataHolder("0000000001", "alpha"));
        dataHolders.add(new DataHolder("0000000002", "bravo"));
        dataHolders.add(new DataHolder("0000000003", "charlie"));
        dataHolders.add(new DataHolder("0000000004", "delta"));
        dataHolders.add(new DataHolder("0000000005", "echo"));
        dataHolders.add(new DataHolder("0000000006", "foxtrot"));
    
        validateAndSend(Partition.A, messageFactory);
    }
    
  2. # 2 楼答案

    通过对职责进行排序,您可能会得到更清晰的代码。 目前,Message类负责将数据持有者项转换为序列化形式。但同时,预计t将确保满足尺寸限制。不幸的是,调用方法正在检查大小预期,而不知道Message类的大小要求

    我建议将责任放在向Message类发送适当的数据垃圾上,从而删除对Message类本身的“关于适当数据垃圾格式的知识”

    您可能还注意到,当前的实现考虑了每个项目的完整标题大小,而每个serialize()只添加一次标题

    请在下面找到建议改进的草图。代码需要进一步完善。但它主要用于说明结构和可读性/可维护性方面的基本改进

    为了从Message类中隔离sendToDatabase()功能,我刚刚添加了一个简单的接口:

    // decoupling the sending logic from the formatting
    // if external requirements suggest linking such functionality into the message class
    // such interface would be unnecessary
    public interface DatabaseDelivery {
        void sendToDatabase(long addres, byte[] messagePayload);
    }
    

    消息类更改为处理垃圾和大小限制。现在是Closeable,表示您应该最终调用close()。(因此,您可以考虑使用当前版本的java的适当构造)

    public final class Message implements Closeable {
        // or initialize it from some external source if this might change dynamically
        private static final int MAX_SIZE = 50000;
        // better determine this in sync with addHeader() method
        private static final int HEADER_SIZE = 36;
    
        private final byte dataCenter;
        private final byte recordVersion;
        private final long address;
        private final long addressFrom;
        private final long addressOrigin;
        private final byte recordsPartition;
        private final byte replicated;
        private final DatabaseDelivery delivery;
        private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
        private int pendingItems = 0;
    
        public Message(final Partition recordPartition, final DatabaseDelivery databaseDelivery) {
            this.recordsPartition = (byte) recordPartition.getPartition();
            this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
            this.recordVersion = 1;
            this.replicated = 0;
            final long packedAddress = new Data().packAddress();
            this.address = packedAddress;
            this.addressFrom = 0L;
            this.addressOrigin = packedAddress;
            this.delivery = databaseDelivery;
        }
    
        private void addHeader(final ByteBuffer buffer, final int items) {
            buffer.put(dataCenter)
                  .put(recordVersion)
                  .putInt(items)
                  .putInt(buffer.capacity())
                  .putLong(address)
                  .putLong(addressFrom)
                  .putLong(addressOrigin)
                  .put(recordsPartition)
                  .put(replicated);
        }
    
        private void sendData() {
            if (itemBuffer.position() == 0) {
                // no data to be sent
                //Properties: itemBuffer serialized size == 0
                return;
            }
            final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
            addHeader(buffer, pendingItems);
            itembuffer.flip();
            buffer.put(itemBuffer);
            delivery.sendToDatabase(address, Arrays.copyOf(buffer.array(),buffer.position());
            itemBuffer.clear();
            pendingItems = 0;
            //Properties: itemBuffer serialized size == 0                
        }
    
        public void addAndSendJunked(final byte[] key, final byte[] data) {
            if (key.length > 255) {
                return;
            }
            if (data.length > 255) {
                return;
            }
            final byte keyLength = (byte) key.length;
            final byte dataLength = (byte) data.length;
    
            final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
            final int newSize = itemBuffer.position() + additionalSize;
            //Properties: itemBuffer serialized size < MAX
            if (newSize >= (MAX_SIZE-HEADER_SIZE)) {
                sendData();
            }
            if (additionalSize > (MAX_SIZE-HEADER_SIZE)) {
                //XXX Use exception that is appropriate for your application
                //XXX You might add sizes involved for ease of analysis
                throw new AppConfigurationException("Size of single item exceeds maximum size");
            }
            //Properties: itemBuffer size (old+new or new) < MAX 
    
            final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
            final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
            // data layout
            itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength).put(data);
            pendingItems++ ;
    

    //属性:itemBuffer大小<;马克斯 }

        @Override
        public void close() {
            if (pendingItems > 0) {
                sendData();
            }
        }
    

    最后,您的调用代码将变异为:

    private void validateAndSend(final Partition partition) {
        final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
    
        // the instance providing sendToDatabase() method
        // just for cutting off details external to the discussion
        final DatabaseDelivery delivery = this;
        final Message message = new Message(partition, this);
    
        DataHolder dataHolder;
        while ((dataHolder = dataHolders.poll()) != null) {
            // XXX: why is client key using explicit encoding while process bytes is not?
            message.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8), dataHolder.getProcessBytes());
        }
        message.close();
    }
    

    请注意,我在可能需要注意的地方添加了一些标记(XXX)。(但是,这些可以从所提供信息之外的信息进行解释)

    还有一些细节可以考虑。 例如,我不相信使用ByteBuffer是给定用例的适当集合(在大多数地方)

    编辑: 关于测试,由于代码的小尺寸,您可能会考虑应用形式验证(至少部分地)。这与现代编译器对静态代码分析的要求类似:您(用纸和笔)遍历您的代码,并派生出在该位置保持不变的属性。我在上面的代码中添加了注释(标记为//Properties),以说明这样做可能会得到什么结果。(注意:这是一个简单的例子,肯定需要为每个语句派生和执行更多属性)。我只是对结果缓冲区大小做了一些最小的属性。(使用MAX' as placeholder for the maximum acceptable size of the item part of the final buffer, akaMAX\u SIZE-HEADER\u SIZE`)

    当然,也许人们会(正确地)建议为关键案例编写测试。在这种情况下,这将是白盒测试。在(已知)实现的拐角处测试代码的正确功能。您还需要使用黑盒测试来测试代码相对于规范的行为

    此外,您还可以添加运行时检查,以确保关键部件的正确行为。例如,在执行sendToDatabase()时,您可以检查最大尺寸要求。然而,这样的测试需要适当的输入来合理化正确的行为。使用通过静态分析从代码中派生的属性,可以提供良好行为的证明,而不会最终怀疑没有找到一个可能导致失败的测试用例