pyspark中CSV文件中的数据映射不正确?

2024-06-26 12:55:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在视图顶部运行SQL查询。我正在尝试使用pyspark将查询输出保存到csv文件中。 在验证文件时,我得到的一些记录不在各自的列中。 列名称-PARNT_CASE_NMBR | CASE_NMBR 这是pypsark的问题吗? 如果是,那么我如何解决这个问题

我正在使用下面的行将查询的输出保存到文件中:

df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').save(row['TargetPath'], quote='',sep='|',header='True',nullValue=None)

输出:

CASE_ID|PARNT_CASE_ID|CREATED_DT|RECORD_TYPE|THREPTC_ID|CHANNEL_ID|SRC|SUB_SRC|ACCT_ID|ADDR_ID|PROD_ID|TERRITORY_ID|CC_LOCTN_GRP_ID|**PARNT_CASE_NMBR|CASE_NMBR**|CATEGORY_ID|CTGRY|SUBCAT_ID|SUB_CTGRY|CATG_ID_MIR|CATG_MIR|SUBCAT_ID_MIR|SUBCAT_MIR|FIRST_NAME|LAST_NM|CUSTOMER_TYPE|PRIM_RPRTR_TYPE|ACCT_TYPE|ACCT_SUB_TYPE|QSTN|STLMNT_RSN|STLMNT_TYPE|USR_ERR_ISSUE|RPRTR_NM|RRA_NO|JJ_SECRY_RPTR_EMAIL|JJ_SECRY_RPTR_OTHR_TYP|QUICK_CD_TITLE|QUICK_CODE|QUICK_SUB_CTGRY|QUICKCDE_CATGRY|RPRTR_ID|SEC_FIRST_NAME|SEC_LAST_NM|SECNDARY_REPORTER_TYPE|ADDR_LINE_1|ADDR_LINE_2|CITY|STATE|ZIP|CNTRY|SEC_ADDR|SEC_ADDR_LINE_1|SEC_ADDR_LINE_2|SEC_CITY|SEC_STATE|ZIP_SEC|CNTRY_SEC|LAST_MOD_DT|SECTOR_CRTD_BY|SECTOR_RQSTD_BY|RQST_TYP|SRVY_FLG|SRCH_TERMS|CONSULT_FLG|ESCLTN_DT|ESCALTD|ESCLTD_TO|MIA_OWNR|CLOSED|CLOSED_DT|SPCL_EVENT|CMMNTS|CASE_REASON|OVRD_REASON|KRA_MET_FLG|THRPTC_AREA|LABEL_TYPE|HLTH_POLCY_DESCN_MKR|PQC_NMBR|J_J_FIRST_NTFYD|CCC_NOTFD_DT|PRCTC_NM|SENT_TO_PRTNR|AE_PQC_RPTD|SAFETY_SRVCS_PRTNR_NO|DCMNT_ITEM_NMBR|PROD_QTY|ITEM_NM|JJ_PURGE|NRRTV|RPRTR_IS_PATIENT|FLFLMNT_DT|TRGT_AUDI|SLTTN|AE_PARTNER_CASE|RELEVANT_DATA_AVAIL|RQST_DT|EMAIL|LOT_NMBR|ANON|RPRT_TYPE|FLLW_UP_SQNC_NMBR|DEL_FLAG|DSTRBTN_LIST|CREATE_AE|CREATE_PQ|CASE_NOTES|OWNER_ID|APPROVER|COMPONENT_IDS|CREATED_BY_ID|STATUS|SUB_STATUS|EMPLY_NAME|IM_INSERT_DT
5000B00000WmmZNQAZ|5000B00000WmkZaQAJ|2016-08-29 18:58:50.0|012U0000000QIzaIAG||61|Phone|Xarelto Carepath|001U000001jrip7IAA|a01U000001W484XIAR|1645||51|**00671609|00672036**|2301||1680||2301||1680||Information Redacted|Information Redacted|Consumer|Employee|Consumer|Consumer|Is there any information on using lavender or eucalyptus in a diffuser while on XARELTO?|||||a3tU0000000CpBhIAK|||||||||||Information Redacted||Frisco|TX|75034|USA|||||||USA|2017-09-12 16:50:17.0|MIC|Janssen Pharmaceuticals||N||||N|||Y|2016-08-29 20:11:34.0|||||N|Cardiovascular||N|||||||||||N||N|2016-08-29 18:59:59.0|Consumer|Mr.|||2016-08-29 00:00:00.0|Information Redacted||N|Initial||N||N|N||005U0000004Lo5jIAC|005U0000004Lo5jIAC|XAR-CV-Cons PI;XARELTO - Package Insert Information [and Medication Guide];Xarelto##ENC-010330-11;Xarelto (rivaroxaban) Prescribing Information - May 2016 ;Xarelto|005U0000004Lo5jIAC|Closed|Fulfilled|0010B00001rneZ5QAI|1579623290022000

文件中的输出错误:

5000B00000XdxnQQAR|||||288|||||||20||Janssen Pharmaceutical Canada 19 Green Belt Dr., Toronto, ON,  M3C1L9 Office: +1 (416) 382 5182|||766|Cancelled|2077|Janssen Canada|1680|||N|N|Initial|005U0000003U3PYIA0||N|||||2016-09-28 00:00:00.0||N||Clinical Use|||||||||**00694588**|||||||||||N|||Canada|2016-10-03 20:56:44.0||N||annvbertrand@gmail.com||||||N|||N|0||Email|||N||CCC|||||||||Comparative Use|Y|Y||N|||Ms.|Y|N||2016-09-27 13:30:28.0||Comparison of Bioequivalent MPH ER Preparations|HCP|"From: Azadtalab, Maysam [JOICA Non-J&J] Sent: Monday, September 26, 2016 1:42 PM To: MedInfo Canada [JOICA] Subject: Request for studies  Hi MedInfo,  Could you please send the following information:  Request:  - Difference between brand and generic - OROS tech with Graph - Fallu study - Van Stralen study  Product:  CONCERTA  Reporter Type:  Pharmacist  Salutation:  Ms.  First Name:   Ann  Last Name:  Bertrand  Province:  ON  Email/Address/Fax:  annvbertrand@gmail.com<mailto:annvbertrand@gmail.com>  Language:  English  Fulfillment Mode:  Email  Employee Name:  Maysam Azadtalab   Thanks Maysam Azadtalab Healthcare Relationship Specialist - Concerta(r)||||||On|N|1579623290022000

5000B00000XdxnQQAR|||||25|||||||20||Janssen Pharmaceutical Canada 19 Green Belt Dr., Toronto, ON,  M3C1L9 Office: +1 (416) 382 5182|||766|Cancelled|2077|Janssen Canada|1680|||N|N|Initial|005U0000003U3PYIA0||N|||||2016-09-28 00:00:00.0||N||Clinical Use|||||||||**00694588**|||||||||||N|||Canada|2016-10-03 20:56:44.0||N||annvbertrand@gmail.com||||||N|||N|0||Email|||N||CCC|||||||||Comparative Use|Y|Y||N|||Ms.|Y|N||2016-09-27 13:30:28.0||Comparison of Bioequivalent MPH ER Preparations|HCP|"From: Azadtalab, Maysam [JOICA Non-J&J] Sent: Monday, September 26, 2016 1:42 PM To: MedInfo Canada [JOICA] Subject: Request for studies  Hi MedInfo,  Could you please send the following information:  Request:  - Difference between brand and generic - OROS tech with Graph - Fallu study - Van Stralen study  Product:  CONCERTA  Reporter Type:  Pharmacist  Salutation:  Ms.  First Name:   Ann  Last Name:  Bertrand  Province:  ON  Email/Address/Fax:  annvbertrand@gmail.com<mailto:annvbertrand@gmail.com>  Language:  English  Fulfillment Mode:  Email  Employee Name:  Maysam Azadtalab   Thanks Maysam Azadtalab Healthcare Relationship Specialist - Concerta(r)||||||On|N|1579623290022000

Tags: comidinformationemailtypedtsecgmail
2条回答

如果您使用的是Spark 2.0+,则可以直接编写csv,请参阅How to export a table dataframe in PySpark to csv?

df.repartition(1).write.csv(row['TargetPath'])

在数据帧上进行选择,然后进行写入可能会解决您的问题

df.repartition(1).select('CASE_ID','PARNT_CASE_ID'...'IM_INSERT_DT').write.format('com.databricks.spark.csv').mode('overwrite').save(row['TargetPath'], quote='',sep='|',header='True',nullValue=None)

您只需要使用我们在从csv文件创建dataframe时使用的相同的optionsfunction

from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("SparkTest").master("local[*]").getOrCreate()

sampleDF=spark.createDataFrame([("manoj","gwalior","mp"),("kumar","delhi","delhi"),("dhakad","chennai","tn")],["name","city","state"])

sampleDF.coalesce(1).write.mode("overwrite").format("csv").option("delimiter","|").option("header","true").save("temp")

//Sample output

name|city|state
manoj|gwalior|mp
kumar|delhi|delhi
dhakad|chennai|tn

如果要更改列的顺序,请按顺序选择列

相关问题 更多 >