组件网关,带有Ai上的DataprocOperator

2024-04-19 08:57:24 发布

您现在位置:Python中文网/ 问答频道 /正文

在GCP中,从UI或gcloud命令安装并运行JupyterHub component相当简单。我试图通过气流和DataprocClusterCreateOperator来编写进程脚本,这里是DAG的摘录

from airflow.contrib.operators import dataproc_operator  

create_cluster=dataproc_operator.DataprocClusterCreateOperator(
        task_id='create-' + CLUSTER_NAME, 
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT_ID,
        num_workers=3,
        num_masters=1,
        master_machine_type='n1-standard-2',
        worker_machine_type='n1-standard-2',
        master_disk_size=100,
        worker_disk_size=100,
        storage_bucket='test-dataproc-jupyter', 
        region='europe-west4', 
        zone='europe-west4-a',
        auto_delete_ttl=21600, 
        optional_components=['JUPYTER', 'ANACONDA']
    )

但是,我无法指定所需的enable-component-gateway参数。看一下源代码,这些参数似乎不是预期的(在deprecatedlast stable操作符中)。你知道吗

我知道restapi提供了endpointConfig.enableHttpPortAccess,但是我宁愿使用官方操作符。 有人知道如何做到这一点吗?你知道吗


Tags: namemasteridtypecreatemachineoperatornum
1条回答
网友
1楼 · 发布于 2024-04-19 08:57:24

编辑,适用于composer-1.8.3和airflow-1.10.3的修复程序

在Airflow 1.10.3中,无法从外部创建群集配置。但是,我们可以继承集群创建操作符并覆盖配置创建。这也允许我们设置可选组件,该气流版本中缺少一个参数。你知道吗

class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):

    def __init__(self, *args, **kwargs):
        super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)

    def _build_cluster_data(self):
        cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
        cluster_data['config']['endpointConfig'] = {
            'enableHttpPortAccess': True
        }
        cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
        return cluster_data

#Start DataProc Cluster
dataproc_cluster = CustomDataprocClusterCreateOperator(
    task_id='create-' + CLUSTER_NAME, 
    cluster_name=CLUSTER_NAME,
    project_id=PROJECT_ID,
    num_workers=3,
    num_masters=1,
    master_machine_type='n1-standard-2',
    worker_machine_type='n1-standard-2',
    master_disk_size=100,
    worker_disk_size=100,
    storage_bucket='test-dataproc-jupyter', 
    region='europe-west4', 
    zone='europe-west4-a',
    auto_delete_ttl=21600, 
    dag=dag
)

气流1.10.7的原始答案

虽然不是最佳的,但是您可以自己创建集群数据结构,而不是让Airflow的ClusterGenerator来完成。它应该适用于最新版本(1.10.7)

cluster = {
  'clusterName': CLUSTER_NAME,
  'config': {
    'gceClusterConfig': {
      'zoneUri': 'europe-west4-a'
    },
    'masterConfig': {
      'numInstances': 1,
      'machineTypeUri': 'n1-standard-2',
      'diskConfig': {
        'bootDiskSizeGb': 100
      },
    },
    'workerConfig': {
      'numInstances': 3,
      'machineTypeUri': 'n1-standard-2',
      'diskConfig': {
        'bootDiskSizeGb': 100
      },
    },
    'softwareConfig': {
      'optionalComponents': [
        'ANACONDA',
        'JUPYTER'
      ]
    },
    'lifestyleConfig': {
      'autoDeleteTtl': 21600
    },
    'endpointConfig': {
      'enableHttpPortAccess': True
    }
  },
  'projectId': PROJECT_ID
}
#Start DataProc Cluster
dataproc_cluster = DataprocClusterCreateOperator(
    task_id='create-' + CLUSTER_NAME,
    project_id=PROJECT_ID,
    num_workers=3,
    region='europe-west4', 
    zone='europe-west4-a',
    cluster = cluster,
    dag=DAG
)

如果您使用的是其他气流版本,请指定。你知道吗

你也可以投票给我打开的bug:AIRFLOW-6432

相关问题 更多 >