diff --git a/.gitignore b/.gitignore index 1e91853..f67c2e9 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ venv data logs extracts +exports plugins # dbt diff --git a/airflow/dags/export_marts/dag.py b/airflow/dags/export_marts/dag.py index 475fbc2..152fd5c 100644 --- a/airflow/dags/export_marts/dag.py +++ b/airflow/dags/export_marts/dag.py @@ -1,8 +1,6 @@ import sqlalchemy import pandas as pd -import boto3 -from io import StringIO from os import environ from airflow_operator import create_dag from airflow.decorators import dag,task @@ -35,21 +33,18 @@ def export_marts(): df = pd.read_sql(f"SELECT * FROM sagerx_dev.{mart};", con=connection) mart_dfs[mart] = df - access_key = environ.get("AWS_ACCESS_KEY_ID") - secret_key = environ.get("AWS_SECRET_ACCESS_KEY") + # get S3 destination from .env file, if any dest_bucket = environ.get("AWS_DEST_BUCKET") - s3_resource = boto3.resource( - 's3', - aws_access_key_id= access_key, - aws_secret_access_key= secret_key - ) - for k in list(mart_dfs.keys()): print(f'putting {k}') - csv_buffer = StringIO() - mart_dfs[k].to_csv(csv_buffer, index=False) - - s3_resource.Object(dest_bucket, f'{k}.csv').put(Body=csv_buffer.getvalue()) + if dest_bucket != '': # if bucket is specified, write to bucket + #mart_dfs[k].to_csv(dest_bucket+f'/{k}.csv', index=False) # if you want CSV + mart_dfs[k].to_parquet(dest_bucket+f'/{k}.parquet', index=False) + #mart_dfs[k].to_csv('/opt/airflow/exports/'+f'{k}.csv', index=False) # if you want CSV + mart_dfs[k].to_parquet('/opt/airflow/exports/'+f'{k}.parquet', index=False) + else: + #mart_dfs[k].to_csv('/opt/airflow/exports/'+f'{k}.csv', index=False) # if you want CSV + mart_dfs[k].to_parquet('/opt/airflow/exports/'+f'{k}.parquet', index=False) export_marts() diff --git a/airflow/requirements.txt b/airflow/requirements.txt index f95c889..8f0255e 100644 --- a/airflow/requirements.txt +++ b/airflow/requirements.txt @@ -5,3 +5,4 @@ dbt-core dbt-postgres apache-airflow[google] bs4 +s3fs \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7b00e23..c7649c1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,8 +1,7 @@ -version: "3.8" x-airflow-common: &airflow-common build: context: ./airflow - image: sagerx_airflow:v0.0.5 # versioning allows a rebuild of docker image where necessary + image: sagerx_airflow:v0.0.6 # versioning allows a rebuild of docker image where necessary networks: - airflow-dbt-network env_file: @@ -26,6 +25,7 @@ x-airflow-common: &airflow-common - ./airflow/logs:/opt/airflow/logs - ./airflow/plugins:/opt/airflow/plugins - ./airflow/data:/opt/airflow/data + - ./airflow/exports:/opt/airflow/exports - ./airflow/config/airflow.cfg:/opt/airflow/airflow.cfg - ./dbt:/dbt - ./gcp.json:/opt/gcp.json @@ -114,9 +114,6 @@ services: command: webserver environment: <<: *airflow-common-env - AWS_ACCESS_KEY_ID: ${ACCESS_KEY} - AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY} - AWS_DEST_BUCKET: ${DEST_BUCKET} ports: - 8001:8080 @@ -126,9 +123,6 @@ services: command: scheduler environment: <<: *airflow-common-env - AWS_ACCESS_KEY_ID: ${ACCESS_KEY} - AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY} - AWS_DEST_BUCKET: ${DEST_BUCKET} networks: airflow-dbt-network: