<p>如果只是实现了期望的输出,那么下面的代码也可以应用</p>
<pre><code>import pandas as pd
from io import StringIO
YOUR_TXT_DATA = """\
1|A|-1|10|100|1|2
1|A|2|20|35|2|3
1|B|1|15|5|3|5
2|B|5|23|25|4|2
2|B|2|33|20|22|98
2|D|4|23|21|20|32
"""
df = pd.read_csv(StringIO(YOUR_TXT_DATA), header=None,
usecols=[_ for _ in range(0, 5)],
names=['key1', 'key2', 'group', 'v1', 'v2'],
sep='|')
result_dict = dict(comboKey1=[], new_v1=[])
for key1, key2, group, v1, v2 in df.values:
key = str(key1) + '_' + str(key2)
if key not in result_dict['comboKey1']:
result_dict['comboKey1'].append(key)
result_dict['new_v1'].append({str(group): [v1, v2]})
else:
index = result_dict['comboKey1'].index(key)
result_dict['new_v1'][index].update({str(group): [v1, v2]})
result_df = pd.DataFrame.from_dict(result_dict)
print(result_df)
</code></pre>
<p>输出</p>
<pre><code> comboKey1 new_v1
0 1_A {'-1': [10, 100], '2': [20, 35]}
1 1_B {'1': [15, 5]}
2 2_B {'5': [23, 25], '2': [33, 20]}
3 2_D {'4': [23, 21]}
</code></pre>
<hr/>
<h2>关于测试数据</h2>
我认为有一些特殊的情况需要考虑,假设数据如下。p>
<pre><code>key1|key2|group|v1|v2|v3|v4
1|A|-1|10|100|1|2
1|A|-1|10|100|1|2
1|A|-1|20|35|2|3
</code></pre>
<p>你的预期产出是多少?(案例1~3)</p>
<ul>
<li>案例1:以最后一个为准<code>1_A {'-1': [20, 35]}</code>(解决方案:dict)</li>
<li>案例2:保留全部但不重复:<code>{('-1', (10, 100)), ('-1', (20, 35))}</code>(解决方案:set)</li>
<li>案例3:保留所有<code>1_A [('-1', (10, 100)), ('-1', (10, 100)), ('-1', (20, 35))]</code>(解决方案:列表)</li>
</ul>
<p>代码:</p>
<pre><code>from unittest import TestCase
import pandas as pd
from io import StringIO
OTHER_TXT_DATA = """\
1|A|-1|10|100|1|2
1|A|-1|10|100|1|2
1|A|-1|20|35|2|3
"""
class MyTests(TestCase):
def __init__(self, *args, **options):
super().__init__(*args, **options)
self.df = pd.read_csv(StringIO(OTHER_TXT_DATA), header=None,
usecols=[_ for _ in range(0, 5)],
names=['key1', 'key2', 'group', 'v1', 'v2'],
sep='|')
def setUp(self) -> None:
# init on every test case.
self.result_dict = dict(comboKey1=[], new_v1=[])
def solution_base(self, new_v1_fun, update_v1_fun) -> pd.DataFrame:
result_dict = self.result_dict
for key1, key2, group, v1, v2 in self.df.values:
key = str(key1) + '_' + str(key2)
if key not in result_dict['comboKey1']:
result_dict['comboKey1'].append(key)
new_v1_fun(group, v1, v2) # result_dict['new_v1'].append({str(group): [v1, v2]})
else:
index = result_dict['comboKey1'].index(key)
update_v1_fun(index, group, v1, v2) # result_dict['new_v1'][index].update({str(group): [v1, v2]})
df = pd.DataFrame.from_dict(result_dict)
print(df)
return df
def test_case_1_dict(self):
df = self.solution_base(new_v1_fun=lambda group, v1, v2: self.result_dict['new_v1'].append({str(group): [v1, v2]}),
update_v1_fun=lambda index, group, v1, v2: self.result_dict['new_v1'][index].update({str(group): [v1, v2]}))
self.assertTrue(df.equals(pd.DataFrame(
columns=['comboKey1', 'new_v1'],
data=[
['1_A', {'-1': [20, 35]}],
]
)))
def test_case_2_set(self):
df = self.solution_base(new_v1_fun=lambda group, v1, v2: self.result_dict['new_v1'].append({(str(group), (v1, v2))}),
update_v1_fun=lambda index, group, v1, v2: self.result_dict['new_v1'][index].add((str(group), (v1, v2))))
self.assertTrue(df.equals(pd.DataFrame(
columns=['comboKey1', 'new_v1'],
data=[
['1_A', {('-1', (20, 35)), ('-1', (10, 100))}],
]
)))
def test_case_3_list(self):
df = self.solution_base(new_v1_fun=lambda group, v1, v2: self.result_dict['new_v1'].append([(str(group), (v1, v2))]),
update_v1_fun=lambda index, group, v1, v2: self.result_dict['new_v1'][index].append((str(group), (v1, v2))))
self.assertTrue(df.equals(pd.DataFrame(
columns=['comboKey1', 'new_v1'],
data=[
['1_A', [('-1', (10, 100)), ('-1', (10, 100)), ('-1', (20, 35))]],
]
)))
</code></pre>
<p><strong>注意:</strong>Python 2不支持注释(请参见<a href="https://www.python.org/dev/peps/pep-0484/" rel="nofollow noreferrer">PEP484</a>)</p>