回答此问题可获得 20 贡献值,回答如果被采纳可获得 50 分。
<p>有没有可能在logstash中编写类似python的脚本?我可以使用logstash将csv数据导入elasticsearch。但我需要使用updateapi,而不是简单地索引所有行。在</p>
<p>这是我的csv文件示例。。。在</p>
<pre><code>vi /tmp/head.txt
"Home","Home-66497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1VSZj","919359000000","HMSHOP","916265100000","2016-05-18 08:41:49"
"Home","Home-26497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1V1","919359000001","HMSHOP","916265100000","2016-05-18 18:41:49"
"Home","Home-36497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/SZj1","919359000001","HMSHOP","916265100000","2016-05-18 12:41:49"
"Home","Home-46497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1","919359000000","HMSHOP","916265100000","2016-05-18 14:41:49"
"Home","Home-56497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1VSZj1xc","919359000000","HMSHOP","916265100000","2016-05-18 16:41:49"
</code></pre>
<p>这是logstash配置文件。。。在</p>
^{pr2}$
<p>我已确认上述配置按预期工作,所有5个记录都存储为5个单独的文档。在</p>
<p>这是我的docker命令。。。在</p>
<pre><code>docker run -d -v "/tmp/logstash.conf":/usr/local/logstash/config/logstash.conf -v /tmp/:/tmp/ logstash -f /usr/local/logstash/config/logstash.conf
</code></pre>
<hr/>
<p>问题是我需要根据目的地编号合并文档。目标应该是文档的ID。有一些行具有相同的目的地。例如,对于_id:919359000001,此文档应将以下两个记录作为嵌套对象。在</p>
<pre><code>"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"
"user": "Home", "messageid" "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/SZj1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp": "2016-05-18 12:41:49"
</code></pre>
<p>Elasticsearch正确地将csv数据转换为json,如上图所示。我需要的是重新格式化语句,以利用updateapi编写脚本
以下代码工作正常。在</p>
<pre><code>POST /test_index/doc/_bulk
{ "update" : { "_id" : "919359000001"} }
{ "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}}, "upsert": {"parent" : [{"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}] }}
{ "update" : { "_id" : "919359000001"} }
{ "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "Home", "messageid": "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V13343", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}}, "upsert": {"parent" : [{"user": "Home", "messageid": "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V13343", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}] }}
</code></pre>
<p>如何在logstash中编写代码,将我的csv数据转换为如上所示?在</p>
<hr/>
<p><strong>更新</strong></p>
<p>我有一个可以正常工作的python代码。我想知道如何修改此代码,以适应根据答案建议的“输出”参数。
在下面的示例中,df峎ujson是一个python对象,它只是一个被扁平化为json的python数据帧。在</p>
<pre><code>import copy
with open('myfile.txt', 'w') as f:
for doc1 in df_json:
import json
doc = mydict(doc1)
docnew = copy.deepcopy(doc)
del docnew['destination']
action = '{ "update": {"_id": %s }}\n' % doc['destination']
f.write(action)
entry = '{ "script" : { "inline": "ctx._source.parent += [\'user\': user, \'messageid\': messageid, \'message\': message, \'code\': code, \'mobile\': mobile, \'mytimestamp\': mytimestamp]", "lang" : "groovy", "params" : %s}, "upsert": {"parent" : [%s ] }}\n' % (doc, docnew)
f.write(entry)
! curl -s -XPOST XXX.xx.xx.x:9200/test_index222/doc/_bulk --data-binary @myfile.txt; echo
</code></pre>
<hr/>
<p><strong>更新2</strong></p>
<p>我尝试了以下配置,它正在替换(不是按脚本更新)文档。在</p>
<pre><code>output {
elasticsearch {
action => "index"
hosts => ["172.17.0.1"]
document_id => "%{destination}"
index => "logstash3-%{+YYYY.MM.dd}"
workers => 1
script => "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]"
script_type => "inline"
script_lang => "groovy"
scripted_upsert => "true"
}
}
</code></pre>
<p>当我将操作更改为“更新”时,我得到以下错误。。。在</p>
<pre><code>:response=>{"update"=>{"_index"=>"logstash4-2016.07.29", "_type"=>"csv", "_id"=>"919359000000",
"status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script",
"caused_by"=>{"type"=>"script_exception", "reason"=>"failed to run in line script
[ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]]
using lang [groovy]", "caused_by"=>{"type"=>"missing_property_exception", "reason"=>"No such property: user for class: fe1b423dc4966b0f0b511b732474637705bf3bb1"}}}}}, :level=>:warn}
</code></pre>
<hr/>
<p><strong>更新3</strong></p>
<p>根据瓦尔的回答,我添加了事件,我得到了这个错误。。。在</p>
<pre><code>:response=>{"update"=>{"_index"=>"logstash4-2016.08.06", "_type"=>"csv", "_id"=>"%{destination}", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", "caused_by"=>{"type"=>"script_exception", "reason"=>"failed to run inline script [ctx._source.parent += ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]] using lang [groovy]", "caused_by"=>{"type"=>"null_pointer_exception", "reason"=>"Cannot execute null+{user=null, messageid=null, message=, code=null, mobile=null, mytimestamp=null}"}}}}}
</code></pre>
<p><strong>更新4</strong></p>
<p>根据瓦尔最新的回答,我试过。。。在</p>
<pre><code>script => "ctx._source.parent = (ctx._source.parent ?: []) + ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]"
</code></pre>
<p>得到了这个错误:</p>
<pre><code>{:timestamp=>"2016-08-12T09:40:48.869000+0000", :message=>"Pipeline main started"}
{:timestamp=>"2016-08-12T09:40:49.517000+0000", :message=>"Error parsing csv", :field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>, :level=>:warn}
</code></pre>
<p>只有2条记录被添加到数据库中。在</p>