<p>编辑:从版本2.12.0开始,Beam提供了新的<code>fileio</code>转换,允许您从CSV读取数据,而无需重新实现源代码。你可以这样做:</p>
<pre><code>def get_csv_reader(readable_file):
# You can return whichever kind of reader you want here
# a DictReader, or a normal csv.reader.
if sys.version_info >= (3, 0):
return csv.reader(io.TextIOWrapper(readable_file.open()))
else:
return csv.reader(readable_file.open())
with Pipeline(...) as p:
content_pc = (p
| beam.io.fileio.MatchFiles("/my/file/name")
| beam.io.fileio.ReadMatches()
| beam.Reshuffle() # Useful if you expect many matches
| beam.FlatMap(get_csv_reader))
</code></pre>
<p>我最近为Apache Beam编写了一个测试。你可以看看<a href="https://github.com/apache/beam/blob/v2.12.0/sdks/python/apache_beam/io/fileio_test.py#L128-L148" rel="nofollow noreferrer">the Github repository</a>。</p>
<hr/>
<p><strong>旧的答案依赖于重新实现源代码。这不再是推荐的主要方式:)</p>
<p>其思想是有一个返回解析的CSV行的源。您可以通过子类化<code>FileBasedSource</code>类以包含CSV解析来实现这一点。尤其是<code>read_records</code>函数看起来像这样:</p>
<pre><code>class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
def read_records(self, file_name, range_tracker):
self._file = self.open_file(file_name)
reader = csv.reader(self._file)
for rec in reader:
yield rec
</code></pre>