Kubernetes / 运维笔记

K8S 部署分布式调度任务 Airflow

Einic Yeo · 11月27日 · 2021年 · ·

一、部署要求

Apache Airflow 已通过以下测试:

NameMain version (dev)Stable version (2.1.4)
Python3.6, 3.7, 3.8, 3.93.6, 3.7, 3.8, 3.9
Kubernetes1.20, 1.19, 1.181.20, 1.19, 1.18
PostgreSQL9.6, 10, 11, 12, 139.6, 10, 11, 12, 13
MySQL5.7, 85.7, 8
SQLite3.15.0+3.15.0+
MSSQL(Experimental)2017,2019

注意: MySQL 5.x 版本不能或有运行多个调度程序的限制——请参阅调度程序文档。MariaDB 未经过测试/推荐。

注意: SQLite 用于 Airflow 测试版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!。不要在生产中使用它。我们建议使用最新的 SQLite 稳定版本进行本地开发。

PS:本文部署 Airflow 稳定版 2.1.4Kubernetes使用1.20.x版本,PostgreSQL使版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!12.x,使用Helm Charts部署。

二、生成Helm Charts配置

PS:使用 helm 3 版本部署

# 创建kubernetes airflow 命名空间
$ kubectl create namespace airflow

# 添加 airflow charts 仓库源
$ helm repo add apache-airflow https://airflow.apache.org

# 更新 aiarflow 源
$ helm repo update

# 查看 airflow charts 所有版本(这里选择部署charts 1.2.0,也就是airflow 2.1.4)
$ helm search repo apache-airflow/airflow -l

NAME                  	CHART VERSION	APP VERSION	DESCRIPTION
apache-airflow/airflow	1.3.0        	2.2.1      	The official Helm chart to deploy Apache Airflo...
apache-airflow/airflow	1.2.0        	2.1.4      	The official Helm chart to deploy Apache Airflo...
apache-airflow/airflow	1.1.0        	2.1.2      	The official Helm chart to deploy Apache Airflo...
apache-airflow/airflow	1.0.0        	2.0.2      	Helm chart to deploy Apache Airflow, a platform...

# 导出 airflow charts values.yaml 文件
$ helm show values apache-airflow/airflow --version 1.2.0 > airflow_1.2.4_values.yaml

三、修改airflow配置

版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!

3.1 配置持续存储 版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!StorageClass

PS: 使用阿里云NAS极速存储

# 编辑 StorageClass 文件
$ vim alicloud-nas-airflow-test.yaml

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: alicloud-nas-airflow-test
mountOptions:
  - nolock,tcp,noresvport
  - vers=3
parameters:
  volumeAs: subpath
  server: "xxxxx.cn-beijing.extreme.nas.aliyuncs.com:/share/airflow/"
provisioner: nasplugin.csi.alibabacloud.com
reclaimPolicy: Retain

# 应用到K8S中
$ kubectl apply -f alicloud-nas-airflow-test.yaml

3.2 配置 airflow Dags 存储仓库 gitSshK版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!ey

# 编辑 airflow-ssh-secret.yaml 文件,首先需要把shh公钥添加到git项目仓库中
$ vim airflow-ssh-secret.yaml

apiVersion: v1
kind: Secret
metadata:
  name: airflow-ssh-secret
  namespace: airflow
data:
  # key needs to be gitSshKey
  gitSshKey: "ssh私钥,base64"

# 应用到K8S中
$ kubectl apply -f airflow-ssh-secret.yaml

3.3 Docker 部署 PostgreSQL 12

# 创建 postgresql 存储目录
$ mkdir /data/postgresql_data

# 创建启动文件 
$ vim docker-compose.yaml

version: "3"

services:
  airflow-postgres:
    image: postgres:12
    restart: always
    container_name: airflow-postgres
    environment:
      TZ: Asia/Shanghai
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: Airflow123
    volumes:
      - /data/postgresql_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

# 启动 postgresql docker
$ docker-compose up -d

3.4 修改 airflow_1.2.4_values.yaml 配置

PS:本文 airflow_1.2.4_values.yaml 配置文件需要三个pvc,服务分别是 redis、worker(只部署1个worker,可以部署多个worker)、dags

uid: 50000
gid: 0
airflowHome: /opt/airflow
defaultAirflowRepository: apache/airflow
defaultAirflowTag: "2.1.4-python3.8"
airflowVersion: "2.1.4"
images:
  airflow:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  # To avoid images with user code, you can turn this to 'true' and
  # all the 'run-airflow-migrations' and 'wait-for-airflow-migrations' containers/jobs
  # will use the images from 'defaultAirflowRepository:defaultAirflowTag' values
  # to run and wait for DB migrations .
  useDefaultImageForMigration: false
  pod_template:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  flower:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  statsd:
    repository: apache/airflow
    tag: airflow-statsd-exporter-2021.04.28-v0.17.0
    pullPolicy: IfNotPresent
  redis:
    repository: redis
    tag: 6-buster
    pullPolicy: IfNotPresent
  pgbouncer:
    repository: apache/airflow
    tag: airflow-pgbouncer-2021.04.28-1.14.0
    pullPolicy: IfNotPresent
  pgbouncerExporter:
    repository: apache/airflow
    tag: airflow-pgbouncer-exporter-2021.09.22-0.12.0
    pullPolicy: IfNotPresent
  gitSync:
    repository: openweb/git-sync
    tag: latest
    pullPolicy: IfNotPresent
nodeSelector:
  label: test
affinity: {}
tolerations: []
labels: {}
ingress:
  # Enable ingress resource
  enabled: false
  # Configs for the Ingress of the web Service
  web:
    # Annotations for the web Ingress
    annotations: {}
    # The path for the web Ingress
    path: "/"
    # The pathType for the above path (used only with Kubernetes v1.19 and above)
    pathType: "ImplementationSpecific"
    # The hostname for the web Ingress (Deprecated - renamed to `ingress.web.hosts`)
    host: ""
    # The hostnames for the web Ingress
    hosts: []
    # The Ingress Class for the web Ingress (used only with Kubernetes v1.19 and above)
    ingressClassName: ""
    # configs for web Ingress TLS
    tls:
      # Enable TLS termination for the web Ingress
      enabled: false
      # the name of a pre-created Secret containing a TLS private key and certificate
      secretName: ""
    # HTTP paths to add to the web Ingress before the default path
    precedingPaths: []
    # Http paths to add to the web Ingress after the default path
    succeedingPaths: []
  # Configs for the Ingress of the flower Service
  flower:
    # Annotations for the flower Ingress
    annotations: {}
    # The path for the flower Ingress
    path: "/"
    # The pathType for the above path (used only with Kubernetes v1.19 and above)
    pathType: "ImplementationSpecific"
    # The hostname for the flower Ingress (Deprecated - renamed to `ingress.flower.hosts`)
    host: ""
    # The hostnames for the flower Ingress
    hosts: []
    # The Ingress Class for the flower Ingress (used only with Kubernetes v1.19 and above)
    ingressClassName: ""
    # configs for web Ingress TLS
    tls:
      # Enable TLS termination for the flower Ingress
      enabled: false
      # the name of a pre-created Secret containing a TLS private key and certificate
      secretName: ""
networkPolicies:
  # Enabled network policies
  enabled: false
airflowPodAnnotations: {}
airflowConfigAnnotations: {}
airflowLocalSettings: |-
  {{- if semverCompare ">=2.2.0" .Values.airflowVersion }}
  {{- if not (or .Values.webserverSecretKey .Values.webserverSecretKeySecretName) }}
  from airflow.www.utils import UIAlert
  DASHBOARD_UIALERTS = [
    UIAlert(
      'Usage of a dynamic webserver secret key detected. We recommend a static webserver secret key instead.'
      ' See the <a href='
      '"https://airflow.apache.org/docs/helm-chart/stable/production-guide.html#webserver-secret-key">'
      'Helm Chart Production Guide</a> for more details.',
      category="warning",
      roles=["Admin"],
      html=True,
    )
  ]
  {{- end }}
  {{- end }}
rbac:
  # Specifies whether RBAC resources should be created
  create: true
executor: "CeleryExecutor"
allowPodLaunching: true
env: []
secret: []
extraSecrets: {}
extraConfigMaps: {}
extraEnv: ~
extraEnvFrom: ~
data:
  # If secret names are provided, use those secrets
  metadataSecretName: ~
  resultBackendSecretName: ~
  brokerUrlSecretName: ~
  # Otherwise pass connection values in
  metadataConnection:
    user: airflow
    pass: Airflow123
    protocol: postgresql
    # 具体部署的postgresql地址
    host: xxx.xxx.xxx.xxx
    port: 5432
    db: airflow
    sslmode: disable
  # resultBackendConnection defaults to the same database as metadataConnection
  resultBackendConnection: ~
  # or, you can use a different database
  # resultBackendConnection:
  #   user: postgres
  #   pass: postgres
  #   protocol: postgresql
  #   host: ~
  #   port: 5432
  #   db: postgres
  #   sslmode: disable
  # Note: brokerUrl can only be set during install, not upgrade
  brokerUrl: ~
fernetKey: ~
fernetKeySecretName: ~
webserverSecretKey: ~
webserverSecretKeySecretName: ~
kerberos:
  enabled: false
  ccacheMountPath: /var/kerberos-ccache
  ccacheFileName: cache
  configPath: /etc/krb5.conf
  keytabPath: /etc/airflow.keytab
  principal: [email protected]
  reinitFrequency: 3600
  config: |
    # This is an example config showing how you can use templating and how "example" config
    # might look like. It works with the test kerberos server that we are using during integration
    # testing at Apache Airflow (see `scripts/ci/docker-compose/integration-kerberos.yml` but in
    # order to make it production-ready you must replace it with your own configuration that
    # Matches your kerberos deployment. Administrators of your Kerberos instance should
    # provide the right configuration.
    [logging]
    default = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_libs.log"
    kdc = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_kdc.log"
    admin_server = "FILE:{{ template "airflow_logs_no_quote" . }}/kadmind.log"
    [libdefaults]
    default_realm = FOO.COM
    ticket_lifetime = 10h
    renew_lifetime = 7d
    forwardable = true
    [realms]
    FOO.COM = {
      kdc = kdc-server.foo.com
      admin_server = admin_server.foo.com
    }
workers:
  # Number of airflow celery workers in StatefulSet
  replicas: 1
  # Command to use when running Airflow workers (templated).
  command: ~
  # Args to use when running Airflow workers (templated).
  args:
    - "bash"
    - "-c"
    # The format below is necessary to get `helm lint` happy
    # 通过 pip 安装扩容Python插件,这样不需要改动airflow镜像
    - |-
      pip install mq_http_sdk PyMySQL -i https://pypi.tuna.tsinghua.edu.cn/simple/;
      exec \
      airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "celery worker" "worker" }}
  # Update Strategy when worker is deployed as a StatefulSet
  updateStrategy: ~
  # Update Strategy when worker is deployed as a Deployment
  strategy:
    rollingUpdate:
      maxSurge: "100%"
      maxUnavailable: "50%"
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~
    # Annotations to add to worker kubernetes service account.
    annotations: {}
  # Allow KEDA autoscaling.
  # Persistence.enabled must be set to false to use KEDA.
  keda:
    enabled: false
    namespaceLabels: {}
    # How often KEDA polls the airflow DB to report new scale requests to the HPA
    pollingInterval: 5
    # How many seconds KEDA will wait before scaling to zero.
    # Note that HPA has a separate cooldown period for scale-downs
    cooldownPeriod: 30
    # Minimum number of workers created by keda
    minReplicaCount: 0
    # Maximum number of workers created by keda
    maxReplicaCount: 10
  persistence:
    # Enable persistent volumes
    enabled: true
    # Volume size for worker StatefulSet
    size: 50Gi
    # If using a custom storageClass, pass name ref to all statefulSets here
    # 前面定义好的 NAS StorageClass
    storageClassName: alicloud-nas-airflow-test
    # Execute init container to chown log directory.
    # This is currently only needed in kind, due to usage
    # of local-path provisioner.
    fixPermissions: false
  kerberosSidecar:
    # Enable kerberos sidecar
    enabled: false
    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi
  resources:
    limits:
      cpu: 2
      memory: 4096Mi
    requests:
      cpu: 100m
      memory: 128Mi
 
  # Grace period for tasks to finish after SIGTERM is sent from kubernetes
  terminationGracePeriodSeconds: 600
  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true
  # Launch additional containers into worker.
  # Note: If used with KubernetesExecutor, you are responsible for signaling sidecars to exit when the main
  # container finishes so Airflow can continue the worker shutdown process!
  extraContainers: []
  # Add additional init containers into workers.
  extraInitContainers: []
  # Mount additional volumes into worker.
  extraVolumes: []
  extraVolumeMounts: []
  # Select certain nodes for airflow worker pods.
  nodeSelector:  {}
  affinity:
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 100
          podAffinityTerm:
            labelSelector:
              matchLabels:
                component: worker
            topologyKey: "kubernetes.io/hostname"
  tolerations: []
  # hostAliases to use in worker pods.
  # See:
  # https://kubernetes.io/docs/concepts/services-networking/add-entries-to-pod-etc-hosts-with-host-aliases/
  hostAliases: []
  # - ip: "127.0.0.2"
  #   hostnames:
  #   - "test.hostname.one"
  # - ip: "127.0.0.3"
  #   hostnames:
  #   - "test.hostname.two"
  logGroomerSidecar:
    # Command to use when running the Airflow worker log groomer sidecar (templated).
    command: ~
    # Args to use when running the Airflow worker log groomer sidecar (templated).
    args: ["bash", "/clean-logs"]
    # Number of days to retain logs
    retentionDays: 7
    resources:
      limits:
        cpu: 100m
        memory: 128Mi
      requests:
        cpu: 100m
        memory: 128Mi
scheduler:
  # If the scheduler stops heartbeating for 5 minutes (10*30s) kill the
  # scheduler and let Kubernetes restart it
  livenessProbe:
    initialDelaySeconds: 10
    timeoutSeconds: 5
    failureThreshold: 10
    periodSeconds: 30
  # Airflow 2.0 allows users to run multiple schedulers,
  # However this feature is only recommended for MySQL 8+ and Postgres
  replicas: 1
  # Command to use when running the Airflow scheduler (templated).
  command: ~
  # Args to use when running the Airflow scheduler (templated).
  # 通过 pip 安装扩容Python插件,这样不需要改动airflow镜像
  args: ["bash", "-c", "pip install mq_http_sdk PyMySQL -i https://pypi.tuna.tsinghua.edu.cn/simple/; exec airflow scheduler"]
  # Update Strategy when scheduler is deployed as a StatefulSet
  # (when using LocalExecutor and workers.persistence)
  updateStrategy: ~
  # Update Strategy when scheduler is deployed as a Deployment
  # (when not using LocalExecutor and workers.persistence)
  strategy: ~
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~
    # Annotations to add to scheduler kubernetes service account.
    annotations: {}
  # Scheduler pod disruption budget
  podDisruptionBudget:
    enabled: true
    # PDB configuration
    config:
      maxUnavailable: 1
  resources:
    limits:
      cpu: 1
      memory: 1024Mi
    requests:
      cpu: 100m
      memory: 128Mi
  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true
  # Launch additional containers into scheduler.
  extraContainers: []
  # Add additional init containers into scheduler.
  extraInitContainers: []
  # Mount additional volumes into scheduler.
  extraVolumes: []
  extraVolumeMounts: []
  # Select certain nodes for airflow scheduler pods.
  nodeSelector:  {}
  affinity:
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 100
          podAffinityTerm:
            labelSelector:
              matchLabels:
                component: scheduler
            topologyKey: "kubernetes.io/hostname"
  tolerations: []
  logGroomerSidecar:
    # Whether to deploy the Airflow scheduler log groomer sidecar.
    enabled: true
    # Command to use when running the Airflow scheduler log groomer sidecar (templated).
    command: ~
    # Args to use when running the Airflow scheduler log groomer sidecar (templated).
    args: ["bash", "/clean-logs"]
    # Number of days to retain logs
    retentionDays: 7
    resources:
      limits:
        cpu: 100m
        memory: 128Mi
      requests:
        cpu: 100m
        memory: 128Mi
createUserJob:
  # Annotations on the create user job pod
  annotations: {}
  # jobAnnotations are annotations on the create user job
  jobAnnotations: {}
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~
    # Annotations to add to create user kubernetes service account.
    annotations: {}
  nodeSelector:  {}
  affinity: {}
  tolerations: []
migrateDatabaseJob:
  # Annotations on the database migration pod
  annotations: {}
  # jobAnnotations are annotations on the database migration job
  jobAnnotations: {}
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~
    # Annotations to add to migrate database job kubernetes service account.
    annotations: {}
  # Launch additional containers into database migration job
  extraContainers: []
  nodeSelector: {}
  affinity: {}
  tolerations: []
webserver:
  allowPodLogReading: true
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 30
    failureThreshold: 20
    periodSeconds: 5
  readinessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 30
    failureThreshold: 20
    periodSeconds: 5
  # Number of webservers
  replicas: 1
  # Command to use when running the Airflow webserver (templated).
  command: ~
  # Args to use when running the Airflow webserver (templated).
  args: ["bash", "-c", "exec airflow webserver"]
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~
    # Annotations to add to webserver kubernetes service account.
    annotations: {}
  # Allow overriding Update Strategy for Webserver
  strategy: ~
  # Additional network policies as needed (Deprecated - renamed to `webserver.networkPolicy.ingress.from`)
  extraNetworkPolicies: []
  networkPolicy:
    ingress:
      # Peers for webserver NetworkPolicy ingress
      from: []
      # Ports for webserver NetworkPolicy ingress (if `from` is set)
      ports:
        - port: airflow-ui
  resources:
    limits:
      cpu: 2
      memory: 4096Mi
    requests:
      cpu: 100m
      memory: 128Mi
  # Create initial user.
  defaultUser:
    enabled: true
    role: Admin
    username: admin
    email: [email protected]
    firstName: admin
    lastName: user
    password: Airflow123
  # Launch additional containers into webserver.
  extraContainers: []
  # Add additional init containers into webserver.
  extraInitContainers: []
  # Mount additional volumes into webserver.
  extraVolumes: []
  extraVolumeMounts: []
  # This string (can be templated) will be mounted into the Airflow Webserver as a custom
  # webserver_config.py. You can bake a webserver_config.py in to your image instead.
  webserverConfig: ~
  # webserverConfig: |
  #   from airflow import configuration as conf
  #   # The SQLAlchemy connection string.
  #   SQLALCHEMY_DATABASE_URI = conf.get('core', 'SQL_ALCHEMY_CONN')
  #   # Flask-WTF flag for CSRF
  #   CSRF_ENABLED = True
  service:
    type: ClusterIP
    ## service annotations
    annotations: {}
    ports:
      - name: airflow-ui
        port: "{{ .Values.ports.airflowUI }}"
    # To change the port used to access the webserver:
    # ports:
    #   - name: airflow-ui
    #     port: 80
    #     targetPort: airflow-ui
    # To only expose a sidecar, not the webserver directly:
    # ports:
    #   - name: only_sidecar
    #     port: 80
    #     targetPort: 8888
    loadBalancerIP: ~
    ## Limit load balancer source ips to list of CIDRs
    # loadBalancerSourceRanges:
    #   - "10.123.0.0/16"
    loadBalancerSourceRanges: []
  # Select certain nodes for airflow webserver pods.
  nodeSelector: {}
  affinity:
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 100
          podAffinityTerm:
            labelSelector:
              matchLabels:
                component: webserver
            topologyKey: "kubernetes.io/hostname"
  tolerations: []
triggerer:
  # Number of airflow triggerers in the deployment
  replicas: 1
  # Command to use when running Airflow triggerers (templated).
  command: ~
  # Args to use when running Airflow triggerer (templated).
  args: ["bash", "-c", "exec airflow triggerer"]
  # Update Strategy for triggerers
  strategy:
    rollingUpdate:
      maxSurge: "100%"
      maxUnavailable: "50%"
  # If the triggerer stops heartbeating for 5 minutes (10*30s) kill the
  # triggerer and let Kubernetes restart it
  livenessProbe:
    initialDelaySeconds: 10
    timeoutSeconds: 5
    failureThreshold: 10
    periodSeconds: 30
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~
    # Annotations to add to triggerer kubernetes service account.
    annotations: {}
  resources:
    limits:
      cpu: 1
      memory: 1024Mi
    requests:
      cpu: 100m
      memory: 128Mi
  # Grace period for triggerer to finish after SIGTERM is sent from kubernetes
  terminationGracePeriodSeconds: 60
  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true
  # Launch additional containers into triggerer.
  extraContainers: []
  # Add additional init containers into triggerers.
  extraInitContainers: []
  # Mount additional volumes into triggerer.
  extraVolumes: []
  extraVolumeMounts: []
  # Select certain nodes for airflow triggerer pods.
  nodeSelector: {}
  affinity:
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 100
          podAffinityTerm:
            labelSelector:
              matchLabels:
                component: triggerer
            topologyKey: "kubernetes.io/hostname"
  tolerations: []
flower:
  # Enable flower.
  # If True, and using CeleryExecutor/CeleryKubernetesExecutor, will deploy flower app.
  enabled: true
  # Command to use when running flower (templated).
  command: ~
  # Args to use when running flower (templated).
  args:
    - "bash"
    - "-c"
    # The format below is necessary to get `helm lint` happy
    - |-
      exec \
      airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "celery flower" "flower" }}
  # Additional network policies as needed (Deprecated - renamed to `flower.networkPolicy.ingress.from`)
  extraNetworkPolicies: []
  networkPolicy:
    ingress:
      # Peers for flower NetworkPolicy ingress
      from: []
      # Ports for flower NetworkPolicy ingress (if ingressPeers is set)
      ports:
        - port: flower-ui
  resources:
    limits:
      cpu: 1
      memory: 1024Mi
    requests:
      cpu: 100m
      memory: 128Mi
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~
    # Annotations to add to worker kubernetes service account.
    annotations: {}
  # A secret containing the connection
  secretName: ~
  # Else, if username and password are set, create secret from username and password
  username: ~
  password: ~
  service:
    type: ClusterIP
    ## service annotations
    annotations: {}
    ports:
      - name: flower-ui
        port: "{{ .Values.ports.flowerUI }}"
    # To change the port used to access flower:
    # ports:
    #   - name: flower-ui
    #     port: 8080
    #     targetPort: flower-ui
    loadBalancerIP: ~
    ## Limit load balancer source ips to list of CIDRs
    # loadBalancerSourceRanges:
    #   - "10.123.0.0/16"
    loadBalancerSourceRanges: []
  # Launch additional containers into the flower pods.
  extraContainers: []
  # Mount additional volumes into the flower pods.
  extraVolumes: []
  # Select certain nodes for airflow flower pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
statsd:
  enabled: true
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~
    # Annotations to add to worker kubernetes service account.
    annotations: {}
  # Additional network policies as needed
  extraNetworkPolicies: []
  resources:
    limits:
      cpu: 1
      memory: 1024Mi
    requests:
      cpu: 100m
      memory: 128Mi
  service:
    extraAnnotations: {}
  # Select certain nodes for statsd pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
  # Additional mappings for statsd exporter.
  extraMappings: []
  uid: 65534
pgbouncer:
  # Enable PgBouncer
  enabled: false
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~
    # Annotations to add to worker kubernetes service account.
    annotations: {}
  # Additional network policies as needed
  extraNetworkPolicies: []
  # Pool sizes
  metadataPoolSize: 10
  resultBackendPoolSize: 5
  # Maximum clients that can connect to PgBouncer (higher = more file descriptors)
  maxClientConn: 100
  # supply the name of existing secret with pgbouncer.ini and users.txt defined
  # you can load them to a k8s secret like the one below
  #  apiVersion: v1
  #  kind: Secret
  #  metadata:
  #    name: pgbouncer-config-secret
  #  data:
  #     pgbouncer.ini: <base64_encoded pgbouncer.ini file content>
  #     users.txt: <base64_encoded users.txt file content>
  #  type: Opaque
  #
  #  configSecretName: pgbouncer-config-secret
  #
  configSecretName: ~
  # PgBouncer pod disruption budget
  podDisruptionBudget:
    enabled: false
    # PDB configuration
    config:
      maxUnavailable: 1
  # Limit the resources to PgBouncer.
  # When you specify the resource request the k8s scheduler uses this information to decide which node to
  # place the Pod on. When you specify a resource limit for a Container, the kubelet enforces those limits so
  # that the running container is not allowed to use more of that resource than the limit you set.
  # See: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
  # Example:
  #
  # resource:
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi
  resources: {}
  service:
    extraAnnotations: {}
  # https://www.pgbouncer.org/config.html
  verbose: 0
  logDisconnections: 0
  logConnections: 0
  sslmode: "prefer"
  ciphers: "normal"
  ssl:
    ca: ~
    cert: ~
    key: ~
  # Add extra PgBouncer ini configuration in the databases section:
  # https://www.pgbouncer.org/config.html#section-databases
  extraIniMetadata: ~
  extraIniResultBackend: ~
  # Add extra general PgBouncer ini configuration: https://www.pgbouncer.org/config.html
  extraIni: ~
  # Select certain nodes for PgBouncer pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
  uid: 65534
  metricsExporterSidecar:
    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi
redis:
  enabled: true
  terminationGracePeriodSeconds: 600
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~
    # Annotations to add to worker kubernetes service account.
    annotations: {}
  persistence:
    # Enable persistent volumes
    enabled: true
    # Volume size for worker StatefulSet
    size: 1Gi
    # If using a custom storageClass, pass name ref to all statefulSets here
    storageClassName: alicloud-nas-airflow-test
  resources:
    limits:
     cpu: 1
     memory: 1024Mi
    requests:
     cpu: 100m
     memory: 128Mi
  # If set use as redis secret. Make sure to also set data.brokerUrlSecretName value.
  passwordSecretName: ~
  # Else, if password is set, create secret with it,
  # Otherwise a new password will be generated on install
  # Note: password can only be set during install, not upgrade.
  password: ~
  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true
  # Select certain nodes for redis pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
registry:
  secretName: ~
  # Example:
  # connection:
  #   user: ~
  #   pass: ~
  #   host: ~
  #   email: ~
  connection: {}
elasticsearch:
  # Enable elasticsearch task logging
  enabled: false
  # A secret containing the connection
  secretName: ~
  # Or an object representing the connection
  # Example:
  # connection:
  #   user: ~
  #   pass: ~
  #   host: ~
  #   port: ~
  connection: {}
ports:
  flowerUI: 5555
  airflowUI: 8080
  workerLogs: 8793
  redisDB: 6379
  statsdIngest: 9125
  statsdScrape: 9102
  pgbouncer: 6543
  pgbouncerScrape: 9127
quotas: {}
limits: []
cleanup:
  enabled: false
  # Run every 15 minutes
  schedule: "*/15 * * * *"
  # Command to use when running the cleanup cronjob (templated).
  command: ~
  # Args to use when running the cleanup cronjob (templated).
  args: ["bash", "-c", "exec airflow kubernetes cleanup-pods --namespace={{ .Release.Namespace }}"]
  # Select certain nodes for airflow cleanup pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~
    # Annotations to add to cleanup cronjob kubernetes service account.
    annotations: {}
postgresql:
  enabled: false
  postgresqlPassword: postgres
  postgresqlUsername: postgres
config:
  core:
    dags_folder: '{{ include "airflow_dags" . }}'
    # This is ignored when used with the official Docker image
    load_examples: 'False'
    executor: '{{ .Values.executor }}'
    # For Airflow 1.10, backward compatibility; moved to [logging] in 2.0
    colored_console_log: 'False'
    remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
    default_timezone: 'Asia/Shanghai'
    dags_are_paused_at_creation: 'True'
    load_default_connections: 'False'
    execute_tasks_new_python_interpreter: 'False'
    donot_pickle: 'True'
    dagbag_import_timeout: 30.0
    dagbag_import_error_tracebacks: 'True'
    dagbag_import_error_traceback_depth: 2
    dag_file_processor_timeout: 50
    unit_test_mode: 'False'
    enable_xcom_pickling: 'False'
    dag_run_conf_overrides_params: 'True'
    dag_discovery_safe_mode: 'True'
    d: 0
    min_serialized_dag_update_interval: 30
    min_serialized_dag_fetch_interval: 10
    max_num_rendered_ti_fields_per_task: 30
    check_slas: 'True'
    xcom_backend: 'airflow.models.xcom.BaseXCom'
    lazy_load_plugins: 'True'
    lazy_discover_providers: 'True'
    hide_sensitive_var_conn_fields: 'True'
  # Authentication backend used for the experimental API
  api:
    auth_backend: airflow.api.auth.backend.deny_all
    enable_experimental_api: 'False'
    maximum_page_limit: 100
    fallback_page_limit: 100
  logging:
    remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
    colored_console_log: 'False'
  metrics:
    statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
    statsd_port: 9125
    statsd_prefix: airflow
    statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
  webserver:
    enable_proxy_fix: 'True'
    # For Airflow 1.10
    rbac: 'True'
    default_ui_timezone: 'Asia/Shanghai'
    # python3 -c 'import secrets; print(secrets.token_hex(16))' 生成
    secret_key: '23c4ab3609baef45a61232e808f51a84'
  email:
    email_backend: 'airflow.utils.email.send_email_smtp'
    email_conn_id: 'smtp_default'
    default_email_on_retry: 'True'
    default_email_on_failure: 'True' 
  smtp:
    smtp_host: 'mail.example.com'
    smtp_starttls: 'False'
    smtp_ssl: 'False'
    smtp_user: 'user'
    smtp_password: 'password'
    smtp_port: 25
    smtp_mail_from: '[email protected]'
    smtp_timeout: 30
    smtp_retry_limit: 5
  celery:
    # 默认是16
    worker_concurrency: 8
  scheduler:
    # statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0
    statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
    statsd_port: 9125
    statsd_prefix: airflow
    statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
    # `run_duration` included for Airflow 1.10 backward compatibility; removed in 2.0.
    run_duration: 41460
  elasticsearch:
    json_format: 'True'
    log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
  elasticsearch_configs:
    max_retries: 3
    timeout: 30
    retry_timeout: 'True'
  kerberos:
    keytab: '{{ .Values.kerberos.keytabPath }}'
    reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}'
    principal: '{{ .Values.kerberos.principal }}'
    ccache: '{{ .Values.kerberos.ccacheMountPath }}/{{ .Values.kerberos.ccacheFileName }}'
  celery_kubernetes_executor:
    kubernetes_queue: 'kubernetes'
  kubernetes:
    namespace: '{{ .Release.Namespace }}'
    airflow_configmap: '{{ include "airflow_config" . }}'
    airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
    pod_template_file: '{{ include "airflow_pod_template_file" . }}/pod_template_file.yaml'
    worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
    worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
    multi_namespace_mode: '{{ if .Values.multiNamespaceMode }}True{{ else }}False{{ end }}'
multiNamespaceMode: false
podTemplate: ~
dags:
  persistence:
    # Enable persistent volume for storing dags
    enabled: true
    # Volume size for dags
    size: 1Gi
    # If using a custom storageClass, pass name here
    storageClassName: alicloud-nas-airflow-test
    # access mode of the persistent volume
    accessMode: ReadWriteOnce
    ## the name of an existing PVC to use
    existingClaim:
  gitSync:
    enabled: true
    # git repo clone url
    # ssh examples ssh://[email protected]/apache/airflow.git
    # [email protected]:apache/airflow.git
    # https example: https://github.com/apache/airflow.git
    # 定义airflow dags 仓库源
    repo: [email protected]/airflow-dags.git
    branch: master
    rev: HEAD
    depth: 1
    # the number of consecutive failures allowed before aborting
    maxFailures: 0
    # subpath within the repo where dags are located
    # should be "" if dags are at repo root
    subPath: "dags"
    # if your repo needs a user name password
    # you can load them to a k8s secret like the one below
    #   ---
    #   apiVersion: v1
    #   kind: Secret
    #   metadata:
    #     name: git-credentials
    #   data:
    #     GIT_SYNC_USERNAME: <base64_encoded_git_username>
    #     GIT_SYNC_PASSWORD: <base64_encoded_git_password>
    # and specify the name of the secret below
    #
    # credentialsSecret: git-credentials
    #
    #
    # If you are using an ssh clone url, you can load
    # the ssh private key to a k8s secret like the one below
    #   ---
    #   apiVersion: v1
    #   kind: Secret
    #   metadata:
    #     name: airflow-ssh-secret
    #   data:
    #     # key needs to be gitSshKey
    #     gitSshKey: <base64_encoded_data>
    # and specify the name of the secret below
    # 文章前面已定义
    sshKeySecret: airflow-ssh-secret
    #
    # If you are using an ssh private key, you can additionally
    # specify the content of your known_hosts file, example:
    #
    # knownHosts: |
    #    <host1>,<ip1> <key1>
    #    <host2>,<ip2> <key2>
    # interval between git sync attempts in seconds
    wait: 60
    containerName: git-sync
    uid: 65533
    extraVolumeMounts: []
    env: []
    resources:
      limits:
       cpu: 100m
       memory: 128Mi
      requests:
       cpu: 100m
       memory: 128Mi
logs:
  persistence:
    # Enable persistent volume for storing logs
    enabled: false
    # Volume size for logs
    size: 20Gi
    # If using a custom storageClass, pass name here
    storageClassName: alicloud-nas-airflow-test
    ## the name of an existing PVC to use
    existingClaim:

四、部署 Airfolw

# 第一次部署 Airflow
$ helm install airflow apache-airflow/airflow --namespace airflow --version 1.2.0 -f airflow_1.2.4_values.yaml

# 以后如果要修改airflow配置,请使用下面命令
$ helm upgrade --install airflow apache-airflow/airflow --namespace airflow --version 1.2.0 -f airflow_1.2.4_values.yaml

五、配置 Airflow Ingress Nginx 访问入口

# 生成 ingress nginx 配置文件
$ vim airflow-ingress.yaml

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: airflow
  namespace: airflow
  annotations:
    kubernetes.io/ingress.class: nginx
    nginx.ingress.kubernetes.io/ssl-redirect: "false"
    nginx.ingress.kubernetes.io/proxy-connect-timeout: "60"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "60"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "60"
spec:
  rules:
  - host: "airflow.example.com"
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: airflow-webserver
            port:
              number: 8080

# 应用到K8S中
$ kubectl apply -f airflow-ingress.yaml

参考文献

0 条回应