在1000个大文件中筛选头2行和尾1行

2024-06-12 15:23:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我在一个文件夹中列出了多个1000个大型文件

每个文件都有2个标题行和尾行

file1
H|*|F|*|TYPE|*|EXTRACT|*|Stage_|*|2021.04.18 07:35:26|##|
H|*|TYP_ID|*|TYP_DESC|*|UPD_USR|*|UPD_TSTMP|##|
E|*||*|CONNECTOR|*|2012.06.01 09:03:11|##|
H|*|Tracking|*|asdasasd|*|2011.03.04 11:50:51|##|
S|*|Tracking|*|asdasdas|*|2011.03.04 11:51:06|##|
T|*|3|*|2021.04.18 07:35:43|##|
file 2
H|*|F|*|PA__STAT|*|EXTRACT|*|Folder|*|2021.04.18 07:35:26|##|
H|*|STAT_ID|*|STAT_DESC|*|UPD_USR|*|UPD_TSTMP|##|
A|*|Active / Actif|*|1604872|*|2018.06.25 15:12:35|##|
D|*||*|CONNECTOR|*|2012.04.06 10:49:09|##|
I|*|Intermittent Leave|*|asdasda|*|2021.04.09 13:14:00|##|
L|*|On Leave|*|asdasasd|*|2011.03.04 11:49:40|##|
P|*|Paid Leave|*|asdasd|*|2011.03.04 11:49:56|##|
T|*|Terminated / Terminé|*|1604872|*|2018.06.25 15:13:06|##|
U|*||*|CONNECTOR|*|2012.06.16 09:04:14|##|
T|*|7|*|2021.04.18 07:35:55|##|

file3

H|*|K|*|PA_CPN|*|EXTRACT|*|SuccessFactors|*|2021.04.22 23:09:26|##|
H|*|COL_NUM|*|CPNT_TYP_ID|*|CPNT_ID|*|REV_DTE|##|
40|*|OLL|*|asdasdas|*|2019.01.21 14:07:00|##|
40|*|OLL|*|asdasda|*|2019.01.21 14:18:00|##|
40|*|OLL|*|asdasdas|*|2019.01.21 14:20:00|##|
T|*|3|*|2021.04.22 23:27:17|##|

我在以H ||和T |开头的行上应用了一个过滤器,但它拒绝了几行的数据

df_cleanse=spark.sql("select replace(replace(replace(value,'~','-'),'|*|','~'),'|##|','') as value from linenumber3 where value not like 'T|*|%' and value not like 'H|*|%'")

我知道我们可以使用zipwithindex,但我必须一个文件一个文件地读取,他们应用zip索引,然后对行进行过滤

for each file:
    df = spark.read.text('file1')
    #Adding index column each row get its row numbers , Spark distibutes the data and to maintain the order of data we need to perfrom this action
    df_1 = df.rdd.map(lambda r: r).zipWithIndex().toDF(['value', 'index'])
    df_1.createOrReplaceTempView("linenumber")
    spark.sql("select * from linenumber where index >1 and value.value not like 'T|*|%'")

请告知相同问题的最佳解决方案。我不想运行一个广泛的程序所有我需要的是突出删除3行。即使使用正则表达式删除行也可以,我们需要以这种格式处理TB的文件 由于文件大小的原因,排除了Unix命令和Sed运算符


Tags: 文件iddfconnectorvaluenotextractreplace
1条回答
网友
1楼 · 发布于 2024-06-12 15:23:33

同时,我等待您的回答,尝试删除前两行和最后一行:

from pyspark.sql.window import Window
import pyspark.sql.functions as f


df = spark.read.csv('your_path', schema='value string')

df = df.withColumn('filename', f.input_file_name())
df = df.repartition('filename')
df = df.withColumn('index', f.monotonically_increasing_id())

w = Window.partitionBy('filename')
df = (df
      .withColumn('remove', (f.col('index') == f.max('index').over(w)) | (f.col('index') < f.min('index').over(w) + f.lit(2)))
      .where(~f.col('remove'))
      .select('value'))

df.show(truncate=False)

输出

+                              -+
|value                                                        |
+                              -+
|E|*||*|CONNECTOR|*|2012.06.01 09:03:11|##|                   |
|H|*|Tracking|*|asdasasd|*|2011.03.04 11:50:51|##|            |
|S|*|Tracking|*|asdasdas|*|2011.03.04 11:51:06|##|            |
|A|*|Active / Actif|*|1604872|*|2018.06.25 15:12:35|##|       |
|D|*||*|CONNECTOR|*|2012.04.06 10:49:09|##|                   |
|I|*|Intermittent Leave|*|asdasda|*|2021.04.09 13:14:00|##|   |
|L|*|On Leave|*|asdasasd|*|2011.03.04 11:49:40|##|            |
|P|*|Paid Leave|*|asdasd|*|2011.03.04 11:49:56|##|            |
|T|*|Terminated / Terminé|*|1604872|*|2018.06.25 15:13:06|##||
|U|*||*|CONNECTOR|*|2012.06.16 09:04:14|##|                   |
|40|*|OLL|*|asdasdas|*|2019.01.21 14:07:00|##|                |
|40|*|OLL|*|asdasda|*|2019.01.21 14:18:00|##|                 |
|40|*|OLL|*|asdasdas|*|2019.01.21 14:20:00|##|                |
+                              -+

相关问题 更多 >