擅长:python、mysql、java
<p>我以一种不优雅的方式解决了这个问题</p>
<pre><code>def processLine(lines):
pass
def collectResults(result):
global counter
counter -= len(result)
counter = 0
pool = Pool(processes = 8)
lines = []
for line in inputStream:
lines.append(line)
if len(lines) >= 5000:
#don't queue more than 1'000'000 lines
while counter.value > 1000000:
time.sleep(0.05)
counter.value += len(lines)
pool.apply_async(processLine, args=(lines), callback = collectResults)
lines = []
</code></pre>