java每次向另一个方法发送大约固定大小的字节数组
我有一个方法,它接受一个参数Partition
enum。在同一时间段内,多个后台线程(最多15个)将通过传递不同的partition
值来调用此方法。这里dataHoldersByPartition
是Partition
和ConcurrentLinkedQueue<DataHolder>
的ImmutableMap
private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;
//... some code to populate entry in `dataHoldersByPartition` map
private void validateAndSend(final Partition partition) {
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length > 255)
continue;
byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;
int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 50000) {
Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
clientKeyBytesAndProcessBytesHolder = new HashMap<>();
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
// calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
}
}
下面是我的Message
课程:
public final class Message {
private final byte dataCenter;
private final byte recordVersion;
private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) {
this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
// Output of this method should always be less than 50k always
public byte[] serialize() {
// 36 + dataSize + 1 + 1 + keyLength + 8 + 2;
int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder);
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
// header layout
byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
.putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
.put(recordsPartition).put(replicated);
// data layout
for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
byte keyType = 0;
byte[] key = entry.getKey();
byte[] value = entry.getValue();
byte keyLength = (byte) key.length;
short valueLength = (short) value.length;
ByteBuffer dataBuffer = ByteBuffer.wrap(value);
long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
.put(value);
}
return byteBuffer.array();
}
private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
int size = 36;
for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
size += 1 + 1 + 8 + 2;
size += entry.getKey().length;
size += entry.getValue().length;
}
return size;
}
// getters and to string method here
}
基本上,我必须确保的是,每当在validateAndSend
方法中调用sendToDatabase
方法时,message.serialize()
字节数组的大小无论如何都应该小于50k。我的sendToDatabase
方法发送来自serialize
方法的字节数组。例如,如果我在dataHolders
CLQ中有60k条记录,那么我将在validateAndSend
方法中发送两个数据块:
- 首先,我将创建一个大约小于50k的字节数组(意味着从
message.serialize()
出来的字节数组小于50k)并对其调用sendToDatabase
方法李> - 其次,我将调用
sendToDatabase
方法来重新定义部分李>
为了完成上述任务,我在validateAndSend
方法中使用了totalSize
变量,该方法试图测量50k大小,但看起来我的方法可能不正确,我可能会丢弃一些记录或每次发送超过50k
看起来我的Message
类知道clientKeyBytesAndProcessBytesHolder
映射,我可以使用这个映射通过调用getBufferCapacity
方法来精确定义大小,如果大约小于50k,那么调用sendToDatabase
方法
# 1 楼答案
因此,这里是我的尝试(问题可能最好向代码审查社区提出,但无论如何)。它依赖于对
Message
的一些设计更改,因此它更像一个Builder
模式。缓冲区成为消息的一部分。它的占用是通过对BufferOverflowException
异常进行反应来控制的。一旦发生,缓冲区回滚到上次成功添加的结果,将分配新消息,并重试尝试添加相同的数据段。缓冲区完成后,记录总数和总大小将写入标头,整个缓冲区将转储到字节数组中(我可能会尝试避免这种额外的转换,并直接在sendToDatabase
中对缓冲区进行操作,但这超出了目前的范围):# 2 楼答案
通过对职责进行排序,您可能会得到更清晰的代码。 目前,
Message
类负责将数据持有者项转换为序列化形式。但同时,预计t将确保满足尺寸限制。不幸的是,调用方法正在检查大小预期,而不知道Message
类的大小要求我建议将责任放在向
Message
类发送适当的数据垃圾上,从而删除对Message
类本身的“关于适当数据垃圾格式的知识”您可能还注意到,当前的实现考虑了每个项目的完整标题大小,而每个
serialize()
只添加一次标题请在下面找到建议改进的草图。代码需要进一步完善。但它主要用于说明结构和可读性/可维护性方面的基本改进
为了从
Message
类中隔离sendToDatabase()
功能,我刚刚添加了一个简单的接口:消息类更改为处理垃圾和大小限制。现在是
Closeable
,表示您应该最终调用close()
。(因此,您可以考虑使用当前版本的java的适当构造)//属性:itemBuffer大小<;马克斯 }
最后,您的调用代码将变异为:
请注意,我在可能需要注意的地方添加了一些标记(
XXX
)。(但是,这些可以从所提供信息之外的信息进行解释)还有一些细节可以考虑。 例如,我不相信使用
ByteBuffer
是给定用例的适当集合(在大多数地方)编辑: 关于测试,由于代码的小尺寸,您可能会考虑应用形式验证(至少部分地)。这与现代编译器对静态代码分析的要求类似:您(用纸和笔)遍历您的代码,并派生出在该位置保持不变的属性。我在上面的代码中添加了注释(标记为
//Properties
),以说明这样做可能会得到什么结果。(注意:这是一个简单的例子,肯定需要为每个语句派生和执行更多属性)。我只是对结果缓冲区大小做了一些最小的属性。(使用MAX' as placeholder for the maximum acceptable size of the item part of the final buffer, aka
MAX\u SIZE-HEADER\u SIZE`)当然,也许人们会(正确地)建议为关键案例编写测试。在这种情况下,这将是白盒测试。在(已知)实现的拐角处测试代码的正确功能。您还需要使用黑盒测试来测试代码相对于规范的行为
此外,您还可以添加运行时检查,以确保关键部件的正确行为。例如,在执行
sendToDatabase()
时,您可以检查最大尺寸要求。然而,这样的测试需要适当的输入来合理化正确的行为。使用通过静态分析从代码中派生的属性,可以提供良好行为的证明,而不会最终怀疑没有找到一个可能导致失败的测试用例