Change this to LocalExecutor: executor = LocalExecutor Airflow DAG Executor Inside the Airflow directory created in the virtual environment, open the airflow.cfg file in your text editor, locate the variable named sql_alchemy_conn, and set the PostgreSQL connection string: sql_alchemy_conn = Airflow executor is currently set to SequentialExecutor. Once completed, scroll to the bottom of the screen and click on Save. Click on test to verify the connection to the database server. Click on the plus sign at the top left corner of your screen to add a new connection and specify the connection parameters. You should already have apache-airflow-providers-postgres and psycopg2 or psycopg2-binary installed in your virtual environment.įrom the UI, navigate to Admin -> Connections. Airflow UI showing created dags How to Set Up a Postgres Database Connection Search for a dag named ‘etl_twitter_pipeline’, and click on the toggle icon on the left to start the dag. Open the browser on localhost:8080 to view the UI. Then start the web server with this command: airflow webserver Start the scheduler with this command: airflow scheduler With a dag_id named 'etl_twitter_pipeline', this dag is scheduled to run every two minutes, as defined by the schedule interval. ![]() Start by importing the different airflow operators like this: from airflow import DAGįrom import EmptyOperatorĭescription="A simple twitter ETL pipeline using Python,PostgreSQL and Apache Airflow", Install the provider package for the Postgres database like this: pip install apache-airflow-providers-postgres How to Set Up the DAG ScriptĬreate a file named etl_pipeline.py inside the dags folder. If this fails, try installing the binary version like this: pip install psycopg2-binary Install the libraries pip install psycopg2 You should have PostgreSQL installed and running on your machine. To store your data, you'll use PostgreSQL as a database. # perform data cleaning and transformationĬontents of transform.py file The DatabaseĪirflow comes with a SQLite3 database. Postgres_sql_upload.bulk_load('twitter_etl_table', data) Postgres_sql_upload = PostgresHook(postgres_conn_id="postgres_connection") Tweets_df = pd.DataFrame(tweets_list, columns=)įrom _hook import PostgresHookĭata = data.to_csv(index=None, header=None) Tweets_list.append([tweet.date,, tweet.rawContent, ![]() Inside the Airflow dags folder, create two files: extract.py and transform.py.Įxtract.py: import as sntwitterįor i,tweet in enumerate(sntwitter.TwitterSearchScraper('Chatham House since:').get_items()): Make sure your Airflow virtual environment is currently active. You will also need Pandas, a Python library for data exploration and transformation. Numerous libraries make it easy to connect to the Twitter API. To get data from Twitter, you need to connect to its API. ![]() Tons of data is generated daily through this platform. Twitter is a social media platform where users gather to share information and discuss trending world events/topics.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |