有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何根据消息中的值将元组发送到不同的螺栓

我有一个风暴群连接到凯尼斯河。消息是这样的

{
    _c: "a"
}

或者应该是这样

{
    _c: "b"
}

我想向一个螺栓发送带有_c=“a”的元组,并向另一个螺栓发送带有_c=“b”的元组。我如何做到这一点

这就是使用GSon将消息从Kinesis解析为JSON对象的方法

@Override
public void execute(Tuple tuple) {
  String partitionKey = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
  String sequenceNumber = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
  byte[] payload = (byte[]) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);

  ByteBuffer buffer = ByteBuffer.wrap(payload);
  String data = null;
  try {
    data = decoder.decode(buffer).toString();

    HashMap < String, String > map = new Gson().fromJson(data, new TypeToken < HashMap < String, Object >> () {}.getType());

    this.outputCollector.emit(tuple, new Values(map));
    this.outputCollector.ack(tuple);

  } catch (CharacterCodingException e) {
    this.outputCollector.fail(tuple);
  }

}

谢谢


共 (1) 个答案

  1. # 1 楼答案

    您可以在bolt中定义两个流,然后声明两个outputstreams:

    @Override
    public void execute(Tuple tuple) {
        // ...
        // Some Code
        // ...
        if (_c =="a") {
        collector.emit("stream1", tuple, new Values(_c));
        } else {
        collector.emit("stream2", tuple, new Values(_c));
        }
    
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("stream1", new Fields("_c"));
        outputFieldsDeclarer.declareStream("stream2", new Fields("_c"));
    } }
    

    然后,在拓扑中,可以使用ShuffleGrouping中的选项传递流id

    topology.setBolt("FirstBolt",new FirstBolt(),1);    
    topology.setBolt("newBolt1", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream1");
    topology.setBolt("newBolt2", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream2");
    

    另一种可能是将其发送到两个螺栓,然后检查两个螺栓中的值并执行所需代码