有 Java 编程相关的问题?




  private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;

  //... some code to populate entry in `dataHoldersByPartition` map

  private void validateAndSend(final Partition partition) {  
    ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
    Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;      
    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll())  != null) {      
      byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
      if (clientKeyBytes.length > 255)

      byte[] processBytes = dataHolder.getProcessBytes();
      int clientKeyLength = clientKeyBytes.length;
      int processBytesLength = processBytes.length;

      int additionalLength = clientKeyLength + processBytesLength;
      if (totalSize + additionalLength > 50000) {
        Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());
        clientKeyBytesAndProcessBytesHolder = new HashMap<>();
        totalSize = 0;
      clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
      totalSize += additionalLength;
    // calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
    if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
        Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());      


public final class Message {
  private final byte dataCenter;
  private final byte recordVersion;
  private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;

  public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) {
    this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;

  // Output of this method should always be less than 50k always
  public byte[] serialize() {
    // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;
    int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder);

    ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
    // header layout

    // data layout
    for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
      byte keyType = 0;
      byte[] key = entry.getKey();
      byte[] value = entry.getValue();
      byte keyLength = (byte) key.length;
      short valueLength = (short) value.length;

      ByteBuffer dataBuffer = ByteBuffer.wrap(value);
      long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();

    return byteBuffer.array();

  private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
    int size = 36;
    for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
      size += 1 + 1 + 8 + 2;
      size += entry.getKey().length;
      size += entry.getValue().length;
    return size;

    // getters and to string method here


  • 首先,我将创建一个大约小于50k的字节数组(意味着从message.serialize()出来的字节数组小于50k)并对其调用sendToDatabase方法
  • 其次,我将调用sendToDatabase方法来重新定义部分



共 (2) 个答案

  1. # 1 楼答案


    // TODO: structure has been adjusted for testing purposes
    enum Partition
        private final int _partition;
        int getPartition()
            return _partition;
        Partition(final int partition)
            _partition = partition;
    // TODO: structure has been adjusted for testing purposes
    final static class DataHolder
        private final String _clientKey;
        private final byte[] _processBytes;
        public DataHolder(
            final String clientKey,
            final String value)
            _clientKey = clientKey;
            byte[] valueBytes = value.getBytes();
            // simulate payload including extra bytes for the header
            final ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + valueBytes.length)
            _processBytes = readToBytes(buffer);
        String getClientKey()
            return _clientKey;
        byte[] getProcessBytes()
            return _processBytes;
    // API has been changed to something more like the Builder pattern
    final static class Message
        private final long address;
        private final long addressFrom;
        private final long addressOrigin;
        private final byte recordsPartition;
        private final byte replicated;
        private final ByteBuffer buffer;
        private final int writeStatsPosition;
        private int payloadCount;
        Message(Partition recordPartition, int sizeLimit)
            this.recordsPartition = (byte) recordPartition.getPartition();
            this.replicated = 0;
            // TODO: temporarily replaced with a hard-coded constant
            long packedAddress = 123456789L;
            this.address = packedAddress;
            this.addressFrom = 0L;
            this.addressOrigin = packedAddress;
            buffer = ByteBuffer.allocate(sizeLimit).order(ByteOrder.BIG_ENDIAN);
            // TODO: temporarily replaced with a hard-coded constant
            byte dataCenter = 0x1;
            byte recordVersion = 1;
            writeStatsPosition = buffer.position();
         * Tries to add another pair of client key and process bytes to
         * the current message. Returns true if successfully added, false -
         * if the data cannot be accommodated due to message binary size limit.
        boolean add(byte[] key, byte[] value)
                byte keyType = 0;
                byte keyLength = (byte) key.length;
                short valueLength = (short) value.length;
                ByteBuffer valueAsBuffer = ByteBuffer.wrap(value);
                long timestamp = valueAsBuffer.capacity() > 10 ? valueAsBuffer.getLong(2) : System.currentTimeMillis();
                // remember position in the buffer to roll back to in case of overflow
                return true;
            catch (BufferOverflowException e)
                payloadCount ;
                return false;
        byte[] serialize()
            int finalPosition = buffer.position();
            // adjust the message header with the totals
            buffer.putInt(writeStatsPosition, payloadCount)
                  .putInt(writeStatsPosition + 4, finalPosition);
            return readToBytes(buffer);
    static void validateAndSend(final Partition partition, final Supplier<Message> messageFactory)
        throws InterruptedException
        final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
        Message message = messageFactory.get();
        DataHolder dataHolder;
        while ((dataHolder = dataHolders.poll()) != null)
            final byte[] keyBytes = dataHolder.getClientKey()
            final int keyLength = keyBytes.length;
            if (keyLength > 255)
            while (!message.add(keyBytes, dataHolder.getProcessBytes()))
                // TODO: consider proper handling of the case when the buffer size is too small to accept even a single pair
                Preconditions.checkState(message.payloadCount > 0,
                    "buffer size too small to accommodate payload");
                final byte[] serializedMessage = message.serialize();
                // TODO: makes sense to introduce a message consumer interface and call it here instead of sendToDatabase() - simplifies testing
                sendToDatabase(message.address, serializedMessage);
                message = messageFactory.get();
        if (message.payloadCount > 0)
            byte[] serializedMessage = message.serialize();
            sendToDatabase(message.address, serializedMessage);
    static void sendToDatabase(long address, byte[] serializedMessage)
        // TODO: added simulating activity
        System.out.printf("Sending %d bytes to %d: %s%n",
            serializedMessage.length, address, DatatypeConverter.printHexBinary(serializedMessage));
    static byte[] readToBytes(ByteBuffer buffer)
        byte[] bytes = new byte[buffer.remaining()];
        return bytes;
    public static void main(String[] args)
        throws ExecutionException, InterruptedException
        // TODO: using small value for testing - must be set to 50K in real case
        final int maxMessageSize = 80;
        final Supplier<Message> messageFactory = new Supplier<Message>()
            public Message get()
                return new Message(Partition.A, maxMessageSize);
        final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(Partition.A);
        dataHolders.add(new DataHolder("0000000001", "alpha"));
        dataHolders.add(new DataHolder("0000000002", "bravo"));
        dataHolders.add(new DataHolder("0000000003", "charlie"));
        dataHolders.add(new DataHolder("0000000004", "delta"));
        dataHolders.add(new DataHolder("0000000005", "echo"));
        dataHolders.add(new DataHolder("0000000006", "foxtrot"));
        validateAndSend(Partition.A, messageFactory);
  2. # 2 楼答案

    通过对职责进行排序,您可能会得到更清晰的代码。 目前,Message类负责将数据持有者项转换为序列化形式。但同时,预计t将确保满足尺寸限制。不幸的是,调用方法正在检查大小预期,而不知道Message类的大小要求





    // decoupling the sending logic from the formatting
    // if external requirements suggest linking such functionality into the message class
    // such interface would be unnecessary
    public interface DatabaseDelivery {
        void sendToDatabase(long addres, byte[] messagePayload);


    public final class Message implements Closeable {
        // or initialize it from some external source if this might change dynamically
        private static final int MAX_SIZE = 50000;
        // better determine this in sync with addHeader() method
        private static final int HEADER_SIZE = 36;
        private final byte dataCenter;
        private final byte recordVersion;
        private final long address;
        private final long addressFrom;
        private final long addressOrigin;
        private final byte recordsPartition;
        private final byte replicated;
        private final DatabaseDelivery delivery;
        private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
        private int pendingItems = 0;
        public Message(final Partition recordPartition, final DatabaseDelivery databaseDelivery) {
            this.recordsPartition = (byte) recordPartition.getPartition();
            this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
            this.recordVersion = 1;
            this.replicated = 0;
            final long packedAddress = new Data().packAddress();
            this.address = packedAddress;
            this.addressFrom = 0L;
            this.addressOrigin = packedAddress;
            this.delivery = databaseDelivery;
        private void addHeader(final ByteBuffer buffer, final int items) {
        private void sendData() {
            if (itemBuffer.position() == 0) {
                // no data to be sent
                //Properties: itemBuffer serialized size == 0
            final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
            addHeader(buffer, pendingItems);
            delivery.sendToDatabase(address, Arrays.copyOf(buffer.array(),buffer.position());
            pendingItems = 0;
            //Properties: itemBuffer serialized size == 0                
        public void addAndSendJunked(final byte[] key, final byte[] data) {
            if (key.length > 255) {
            if (data.length > 255) {
            final byte keyLength = (byte) key.length;
            final byte dataLength = (byte) data.length;
            final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
            final int newSize = itemBuffer.position() + additionalSize;
            //Properties: itemBuffer serialized size < MAX
            if (newSize >= (MAX_SIZE-HEADER_SIZE)) {
            if (additionalSize > (MAX_SIZE-HEADER_SIZE)) {
                //XXX Use exception that is appropriate for your application
                //XXX You might add sizes involved for ease of analysis
                throw new AppConfigurationException("Size of single item exceeds maximum size");
            //Properties: itemBuffer size (old+new or new) < MAX 
            final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
            final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
            // data layout
            itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength).put(data);
            pendingItems++ ;

    //属性:itemBuffer大小<;马克斯 }

        public void close() {
            if (pendingItems > 0) {


    private void validateAndSend(final Partition partition) {
        final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
        // the instance providing sendToDatabase() method
        // just for cutting off details external to the discussion
        final DatabaseDelivery delivery = this;
        final Message message = new Message(partition, this);
        DataHolder dataHolder;
        while ((dataHolder = dataHolders.poll()) != null) {
            // XXX: why is client key using explicit encoding while process bytes is not?
            message.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8), dataHolder.getProcessBytes());


    还有一些细节可以考虑。 例如,我不相信使用ByteBuffer是给定用例的适当集合(在大多数地方)

    编辑: 关于测试,由于代码的小尺寸,您可能会考虑应用形式验证(至少部分地)。这与现代编译器对静态代码分析的要求类似:您(用纸和笔)遍历您的代码,并派生出在该位置保持不变的属性。我在上面的代码中添加了注释(标记为//Properties),以说明这样做可能会得到什么结果。(注意:这是一个简单的例子,肯定需要为每个语句派生和执行更多属性)。我只是对结果缓冲区大小做了一些最小的属性。(使用MAX' as placeholder for the maximum acceptable size of the item part of the final buffer, akaMAX\u SIZE-HEADER\u SIZE`)

