Python多处理写入JSON文件

2024-05-03 12:31:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要从Rest API阅读订单字典,如下所示:-

OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947734C'), ('_cd', '1809:718727061')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947735C'), ('_cd', '1809:718727063')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947736C'), ('_cd', '1809:718727065')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947737C'), ('_cd', '1809:718727067')])

我的要求是读取有序字典,并将JSON格式的数据以多处理方式写入JSON文件。但我的代码工作不正常,它没有将JSON格式的数据写入目标文件。请建议

代码如下:


from multiprocessing import Pool
from collections import OrderedDict
import simplejson as json

rr = OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947734C'), ('_cd', '1809:718727061')])

f = open('iitp222.json', "a")

def write_data(args):
    f.write(args + '\n')

###Get the results and display them using the ResultsReader.
if __name__ == "__main__":
    for result in rr:
            print result
            p = Pool()
            result = p.map(write_data, json.dumps(result))
            p.close()
            p.join()
    f.close()

Tags: 数据importjson字典格式cdresultwrite
1条回答
网友
1楼 · 发布于 2024-05-03 12:31:16

我可以通过以下代码解决我的问题

#      Import Lib           -#
import splunklib.results as results
from collections import OrderedDict
import splunklib.client  as client
import simplejson as json, sys
from datetime import datetime
import multiprocessing as mp

fn=sys.argv[1]
HOST = "restapi.xxxx.com"
PORT = 443

#  Capturing Current Hour & Min       #
Cur_min1 = datetime.now().strftime('%M')
Cur_min = int(Cur_min1)/2

#  Evaluating time to flip different users  - #
if int(Cur_min) % 4 == 0:
        USERNAME = "xxxxxx"
        PASSWORD = "yyyyyy"
elif int(Cur_min) % 4 == 1:
        USERNAME = "xxxxxx"
        PASSWORD = "yyyyyy"
elif int(Cur_min) % 4 == 2:
        USERNAME = "kuuuu1"
        PASSWORD = "yyyyyy"
else:
        USERNAME = "xxxx"
        PASSWORD = "yyyyyy"

# Create a Service instance and log in
try:
    service = client.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD)
    rr = results.ResultsReader(service.jobs.export("search index=xxx host=yyyyyy* sourcetype=xxxx splunk_server=idx* earliest=-2m@m"))
    f1=open(fn, 'w')
    f1.close()
except:
    os.system("python /home/xxx/MS_TeamsNotification.py 'Unable to connect Splunk Rest-API'")
    exit()

###Get the results and display them using the ResultsReader.

def worker(arg, q):
    res = json.dumps(arg)
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''
    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for result in rr:
        if isinstance(result, dict):
            job = pool.apply_async(worker, (result, q))
            jobs.append(job)
    assert rr.is_preview == False

    # collect results from the workers through the pool result queue
    for job in jobs:
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()


相关问题 更多 >