Biq Query To RedShift

Here is simple Airflow DAG which exports data from Google Biq Query and ships these data into AWS Redsift Cluster. 

from datetime import datetime, timedelta

from airflow.contrib.operators.bigquery_to_gcs import \
    BigQueryToCloudStorageOperator
from airflow.contrib.operators.gcs_download_operator import \
    GoogleCloudStorageDownloadOperator
from airflow.models import DAG
from asp.plugins.operators import PostgresOperator, UploadFile

DAG_ID = "bq_2_rs"

args = {
    "start_date": datetime.utcnow()
}

with DAG(dag_id=DAG_ID,
         default_args=args,
         schedule_interval=None,
         dagrun_timeout=timedelta(minutes=60),
         max_active_runs=1) as dag:

    bq_to_gcp_avro = BigQueryToCloudStorageOperator(
        task_id='bq_to_gcs_avro',
        dag=dag,
        source_project_dataset_table='project:dataset.table',
        destination_cloud_storage_uris=[
            'gs://test-bucket/test-bq/data/part-*.avro'
        ],
        export_format='AVRO',
        bigquery_conn_id='gcp',
    )

    local_dl = GoogleCloudStorageDownloadOperator(
        task_id='gcs_download',
        dag=dag,
        bucket='kiwi_pursuit',
        object='test-bq/data/part-000000000000.avro',
        filename='/tmp/data.avro',
        google_cloud_storage_conn_id='gcp'
    )

    local_dl.set_upstream(bq_to_gcp_avro)

    upload = UploadFile(
        task_id='s3_upload',
        dag=dag,
        params={
            'source_path': '/tmp/data.avro',
            'output_path': 'test-bq/bq-data.avro',
            'output_connection': 's3.stories.bi',
        })

    upload.set_upstream(local_dl)

    load_to_rs = PostgresOperator(
        task_id='load_to_redshift',
        dag=dag,
        sql='''
CREATE TABLE tmp_table (
id text,
days_to_departure text);

-- copy
copy tmp_table from 's3://bucket/data.avro'
credentials 'aws_iam_role=arn:aws:iam::id:role/myRedshiftRole'
format as avro 'auto';
        ''',
        postgres_conn_id='redshift'
    )
    load_to_rs.set_upstream(upload)