使用无服务器概念使用数据流库。

dataflows-serverless的Python项目详细描述


#dataflows无服务器

dataflows无服务器允许使用无服务器概念的[dataflows](https://github.com/datahq/dataflows)库。

*在kubernetes上运行,通过在需要时启动和停止群集来节省成本。
*支持并行处理适当的处理步骤(例如独立处理每行的步骤)。
*使用标准数据流库在本地开发和测试处理流。

###创建一个流

使用[dataflows]库(https://github.com/datahq/dataflows)在本地开发一个流。

网址:

````
从数据流导入服务器流,无服务器步骤
从数据流导入转储到路径
导入请求

url=['http://httpbin.org/get?page={}。范围(100)内页的格式(page)


def download(row):
print('downloading{}.format(url))
row['content']=str(requests.get(row['url']).json())

serverlessflow(({'url':url,'content':'}用于url中的url),
serverlessflow(download),
dump_to_path('url_contents').serverless().process()
```

save it as`flow.py`,install the dataflows serverless package and run the flow locally,如果没有无服务器:

```
pip3安装无服务器数据流
python3 flow.py
````

以下是使用google kubernetes引擎创建集群的推荐方法

尝试使用[google cloud shell](https://cloud.google.com/shell/)这使得过程更加简单。
*[install google cloud sdk](https://cloud.google.com/sdk/docs/downloads interactive)
*安装kubectl:
*`gcloud components install kubectl`
*使用应用程序默认凭据(ADC):
*`gcloud auth application default login`
*连接到现有的数据流群集,如果不存在,则创建一个新的群集:
*`$(dataflows_serverless_bin)/gke_connect_或_create.sh<;google_project_id>;`
*您可以为创建的集群配置一些附加参数,运行不带任何参数的"gke_connect"或"u create"脚本以查看可用参数。





minikube是在虚拟机上本地运行的kubernetes群集:

*install[minikube](https://kubernetes.io/docs/tasks/tools/install minikube/)
*启动minikube集群:
*`minikube start`
*切换到minikube上下文:
*`kubectl config use context minikube`
*创建数据流命名空间:
*`kubectl create ns datafows`
*设置默认情况下使用此命名空间的当前上下文:
*`kubectl config set context minikube--namespace=dataflows`

\步骤。

```
python3 flow.py--serverless--secondaries=10--output datadir=url-contents
`````

Docker Images,后续的流会运行得更快。

如果你使用的是Google Kubernetes引擎,完成后不要忘记删除集群:

````
$(dataflows懔serverless懔bin)/gke懔cleanup.sh<;Google懔project懔id>;
```

高级用法

Dockerfile添加了python映像库:

```
来自orihoch/dataflows serverless:9
运行apk add--update--no cache zlib dev jpeg dev&;pip3 install helph
```

生成并推送修改后的映像。如果图像已经存在,kubernetes不会提取它,因此,请确保修改您构建的每个映像上的标记。

数据

您可以提供输入数据,这些数据将在作业启动之前复制,所有输入数据应位于当前工作目录的相对路径中提供大数据和更高级的数据初始化您可以提供一个数据初始化容器。

下面的示例演示如何使用google云存储。

google/cloud sdk
entrypoint["bash"、"-c"、"\
mkdir-p/exports/data/&;\
gcloud--project=google嫒project嫒u id auth activate service account--key file=/secrets/service-account.json&;\
gsutil-m cp-r gs://bucket-name/data//exports/
"]
````

映像需要凭据才能访问Google存储,下面的脚本创建一个服务帐户,用于对Google云进行身份验证,
相关服务帐户密钥和kubernetes secret

```
谷歌项目ID=
服务帐户名称=
机密名称=
gcloud--project=$google项目ID IAM服务帐户创建$service帐户名称&;\
gcloud projects添加iam策略绑定$google_project_id\
--成员"服务帐户:${service_account_name}@${google_project_id}.iam.gserviceaccount.com"\
--角色"角色/storage.objectadmin"&;\
gcloud iam service accounts keys create secret-service-account.json--iam account${service-account-name}@${google-project-id}.iam.gserviceaccount.com&;\
kubectl create secret generic${secret-name}--from file=service account.json=secret service account.json
`````

可能需要修改流以从正确的目录获取输入数据。

"data")运行无服务器流:

````

数据服务器正在运行,为--nfs uuid=参数提供一个唯一值:

```
python3 flow.py--serverless--secondaries=10--output datadir=url戥contents\
--data init image=your username/data init--data init secret=${secret戥name}\
--nfs uuid=my nfs
````

本案,您必须确保每个数据服务器上都运行一个主作业。

清理可能有问题但是,由于nfs的动态特性和使用情况,应该使用以下命令:

````
kubectl delete jobs--all&;kubectl delete all--all
```

相关pod,例如:

```
kubectl exec-it primary_pod_name/entrypoint.sh
kubectl exec-it secondary_1_pod_name/entrypoint.sh
kubectl exec-it secondary_2_pod_name/entrypoint.sh
```

"version.txt"中的版本

&;docker push orihoch/dataflows serverless:v$(cat version.txt)`

pypi:`python setup.py sdist&;tween upload dist/dataflows\u serverless-$(cat version.txt).tar.gz`

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
如何获得@sign,以便在Java代码中键入@override?   java Facebook登录不起作用,我忘了什么?   json如何在java中从MongoDB Atlas获取特定字段?   java如何在Android上的JNI中实现委托/协议(在iOS上)?   java为什么这个循环多次的程序在循环后有一个“println”时需要花费时间?   java无法使组合框正常工作   JavaCQ5。如何为作者显示列表发布者?版本5.5   java我可以要求泛型参数具有泛型参数吗?   JavaOKHTTP无法获取整个JSON   数组Java:用字符减去字符意味着什么?   java为什么Eclipse content assist无法从部分方法名生成方法存根?   java使用线程在Android Studio中一次加载一个多位图   java遍历二叉树并返回一个值