apacheflume从python scrip获取数据

2024-10-07 00:30:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在运行一个python脚本来从新闻提供者收集数据,并将此脚本源于水槽.conf文件。在

我的水槽.conf文件:

newsAgent.sources = r1
newsAgent.sinks = spark
newsAgent.channels = MemChannel

# Describe/configure the source
newsAgent.sources.r1.type = exec
newsAgent.sources.r1.command = python path_to/data_collector.py

# Describe the sink
newsAgent.sinks.spark.type = avro
newsAgent.sinks.spark.channel = memoryChannel
newsAgent.sinks.spark.hostname = localhost
newsAgent.sinks.spark.port = 4040

# Use a channel which buffers events in memory
newsAgent.channels.MemChannel.type = memory
newsAgent.channels.MemChannel.capacity = 10000
newsAgent.channels.MemChannel.transactionCapacity = 100

# Bind the source and sink to the channel
newsAgent.sources.r1.channels = MemChannel
newsAgent.sinks.spark.channel = MemChannel

python脚本在阳光下运行良好,我可以看到json数据正在打印。但当我通过flume执行它时,下沉数据会引发低于警告的消息。在

警告信息

^{pr2}$

数据_收集器.py在

def process():
    for k, v in news_source.items():
        feeds = feedparser.parse(v)
        for e in feeds.entries:
            doc = json.dumps(
                {"news_provider": k, "title": e.title.strip(), "summary": BeautifulSoup(e.summary, 'lxml').text.strip(),
                 "id": e.id.strip(), "published": e.published if e.has_key('published') else None})
            print("%s"%doc)

流媒体脚本

def func():
    sc = SparkContext(master="local[*]", appName="App")
    ssc  = StreamingContext(sc, 300)
    flume_strm = FlumeUtils.createStream(ssc, "localhost", 9999)

    lines = flume_strm.map(lambda v: json.loads(v[1]))
    lines.pprint()
    ssc.start()
    ssc.awaitTermination()

使用的命令

bin/flume-ng agent --conf conf --conf-file libexec/conf/test.conf --name Agent -Dflume.root.logger=INFO,console

spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0  path_to/streaming_script.py

我无法删除这些警告消息,我希望使用pprint()在spark日志中打印相同的json数据,稍后我可以相应地处理这些消息。在

我在阅读流式内容时是否缺少任何特定的配置? 我需要指定任何特定的编码器吗?在

感谢任何帮助。在


Tags: the数据脚本jsonconfchannelsparkchannels
1条回答
网友
1楼 · 发布于 2024-10-07 00:30:04

我一定和你看了同样的教程。我尝试了很多不同的选择。大多数都没有成功。不过,我找到了一个解决办法:在水槽.conf然后像你现在做的那样调用脚本。但是,在python脚本中,将数据写入文件。然后“cat”脚本之前的文件(数据_收集器.py)停止执行。在

我认为这是因为exec源需要“流式”数据,而简单地打印输出是行不通的。在

我的设置和你的非常相似:

在流.py(为便于理解,删除了逻辑):

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

if __name__ == "__main__":
    sc = SparkContext(appName="test");
    ssc = StreamingContext(sc, 30)
    stream = FlumeUtils.createStream(ssc, "127.0.0.1", 55555)
    stream.pprint()

这是我的数据_收集器.py(注意最后一行的“cat”命令):

^{pr2}$

这是我的水槽.conf公司名称:

# list sources, sinks and channels in the agent
agent.sources = tail-file
agent.channels = c1
agent.sinks=avro-sink

# define the flow
agent.sources.tail-file.channels = c1
agent.sinks.avro-sink.channel = c1
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000

# define source and sink
agent.sources.tail-file.type = exec
agent.sources.tail-file.command =  python /home/james/Desktop/testing/data_collector.py
agent.sources.tail-file.channels = c1
agent.sinks.avro-sink.type = avro
agent.sinks.avro-sink.hostname = 127.0.0.1
agent.sinks.avro-sink.port = 55555

所以基本上在我的数据里_收集器.py,我只是做任何需要做的逻辑,把它写到一个名为执行.txt,然后立即“猫”文件。它起作用了。。。祝你好运

相关问题 更多 >