How to use PostresHook in airflow.
First I had to import the hook
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
Then I had to define the function that takes the CSV file and loads in into postgres
def _store_user():
hook=PostgresHook(postgres_conn_it='postgres')
hook.copy_expert(
sql="COPY users FROM stdin WITH DELIMITER as ','",
filename='/tmp/processed_user.csv'
)
In the process, we used PostgresHook(postgres_conn_it='postgres') to call the postgres connection we created in the airflow UI and initialized the hook variable.
hook.copy_expert(
.....)
We used hook.copy_expert to load the CSV data. hook is like a bridging technology that helps bridge psql and postgresoperator.
Lastly, we called the python function _store_user() using PythonOperator.
store_user = PythonOperator(
task_id='store_user',
python_callable=_store_user
)