<p>这里我有一个不同的方法。我会尽力的</p>
<ol>
<li>将所有<code>csv</code>转换为<code>parquet</code>(最终看到这个<a href="https://stackoverflow.com/a/63257331/4819376">answer</a>)更改<code>dtypes</code>。至少</li>
</ol>
<pre class="lang-py prettyprint-override"><code>df['Date'] = df['Date'].astype("M8")
</code></pre>
<p>或</p>
<pre class="lang-py prettyprint-override"><code>df['Date'] = pd.to_datetime(df['Date'])
</code></pre>
<ol start=“2”>
<li>由发送方重新进行分区。我假设所有拼花文件都在<code>processed</code>文件夹中</李>
</ol>
<pre class="lang-py prettyprint-override"><code>import dask.dataframe as dd
df = dd.read_parquet('processed')
df.to_parquet('processed2', partition_on='Sender')
</code></pre>
<ol start=“3”>
<li><p>现在您在每个<code>Sender=username</code>中都有许多文件,您应该将它们合并到一个文件中</p>
</li>
<li><p>现在可以为每个<code>Sender=username</code>创建函数</p>
</li>
</ol>
<pre class="lang-py prettyprint-override"><code>def fun(df):
df = df.sort_values("Date")
df["Day Since Prev Shipment"] = df["Date"].diff().dt.days
df["Day Since First Shipment"](df["Date"] - df["Date"].min()).dt.days
df["Cumulative Quantity"] = df["Quantity"].cumsum()
df["Quantity difference"] = df["Quantity"].diff()
grp = df.groupby("Recipient")["Date"].min().reset_index(name="First Shipment")
df = pd.merge(df, grp, how="left", on="Recipient")
df["First Shipment"] = (df["Date"]==df["First Shipment"]).astype("int8")
return df
</code></pre>