Biq Query To RedShift
Here is simple Airflow DAG which exports data from Google Biq Query and ships these data into AWS Redsift Cluster.
from datetime import datetime, timedelta from airflow.contrib.operators.bigquery_to_gcs import \ BigQueryToCloudStorageOperator from airflow.contrib.operators.gcs_download_operator import \ GoogleCloudStorageDownloadOperator from airflow.models import DAG from asp.plugins.operators import PostgresOperator, UploadFile DAG_ID = "bq_2_rs" args = { "start_date": datetime.utcnow() } with DAG(dag_id=DAG_ID, default_args=args, schedule_interval=None, dagrun_timeout=timedelta(minutes=60), max_active_runs=1) as dag: bq_to_gcp_avro = BigQueryToCloudStorageOperator( task_id='bq_to_gcs_avro', dag=dag, source_project_dataset_table='project:dataset.table', destination_cloud_storage_uris=[ 'gs://test-bucket/test-bq/data/part-*.avro' ], export_format='AVRO', bigquery_conn_id='gcp', ) local_dl = GoogleCloudStorageDownloadOperator( task_id='gcs_download', dag=dag, bucket='kiwi_pursuit', object='test-bq/data/part-000000000000.avro', filename='/tmp/data.avro', google_cloud_storage_conn_id='gcp' ) local_dl.set_upstream(bq_to_gcp_avro) upload = UploadFile( task_id='s3_upload', dag=dag, params={ 'source_path': '/tmp/data.avro', 'output_path': 'test-bq/bq-data.avro', 'output_connection': 's3.stories.bi', }) upload.set_upstream(local_dl) load_to_rs = PostgresOperator( task_id='load_to_redshift', dag=dag, sql=''' CREATE TABLE tmp_table ( id text, days_to_departure text); -- copy copy tmp_table from 's3://bucket/data.avro' credentials 'aws_iam_role=arn:aws:iam::id:role/myRedshiftRole' format as avro 'auto'; ''', postgres_conn_id='redshift' ) load_to_rs.set_upstream(upload)