java如何根据消息中的值将元组发送到不同的螺栓
我有一个风暴群连接到凯尼斯河。消息是这样的
{
_c: "a"
}
或者应该是这样
{
_c: "b"
}
我想向一个螺栓发送带有_c=“a”的元组,并向另一个螺栓发送带有_c=“b”的元组。我如何做到这一点
这就是使用GSon将消息从Kinesis解析为JSON对象的方法
@Override
public void execute(Tuple tuple) {
String partitionKey = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
String sequenceNumber = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
byte[] payload = (byte[]) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);
ByteBuffer buffer = ByteBuffer.wrap(payload);
String data = null;
try {
data = decoder.decode(buffer).toString();
HashMap < String, String > map = new Gson().fromJson(data, new TypeToken < HashMap < String, Object >> () {}.getType());
this.outputCollector.emit(tuple, new Values(map));
this.outputCollector.ack(tuple);
} catch (CharacterCodingException e) {
this.outputCollector.fail(tuple);
}
}
谢谢
# 1 楼答案
您可以在bolt中定义两个流,然后声明两个outputstreams:
然后,在拓扑中,可以使用
ShuffleGrouping
中的选项传递流id另一种可能是将其发送到两个螺栓,然后检查两个螺栓中的值并执行所需代码