#!/bin/bash -e #Directories homeDir=$HOME/etl dataDir=$homeDir/data backupDir=$homeDir/data/old configDir=$homeDir/conf input_file=$configDir/serverCredentials-postgres.txt declare -A credentials while IFS="=" read -r key value; do credentials[$key]=$value done < <( sed -e '/^\s*$/ d' -e '/^#/ d' $input_file ) for i in ${credentials[@]}; do echo ***$i; done echo "source user" ${credentials[source_user]} echo "source user" ${credentials[source_password]} echo "source user" ${credentials[source_host]} echo "source user" ${credentials[source_db]} echo "source user" ${credentials[source_protocol]} echo "source user" ${credentials[source_server]} echo "source user" ${credentials[target_user]} echo "source user" ${credentials[target_password]} echo "source user" ${credentials[target_host]} echo "source user" ${credentials[target_db]} echo "source user" ${credentials[target_protocol]} echo "source user" ${credentials[target_server]} source_user=${credentials[source_user]} source_password=${credentials[source_password]} source_host=${credentials[source_host]} source_db=${credentials[source_db]} source_sch=${credentials[source_sch]} source_protocol=${credentials[source_protocol]} source_server=${credentials[source_server]} target_user=${credentials[target_user]} target_password=${credentials[target_password]} target_host=${credentials[target_host]} target_db=${credentials[target_db]} target_sch=${credentials[target_sch]} target_protocol=${credentials[target_protocol]} target_server=${credentials[target_server]} control_sch='control' #source_con="PGOPTIONS=--search_path=source PGPASSWORD=$source_password psql -h $source_host -d $source_db -U $source_user" #control_con="PGOPTIONS=--search_path=control PGPASSWORD=$target_password psql -h $target_host -d $target_db -U $target_user" #trx_con="PGOPTIONS=--search_path=trx PGPASSWORD=$target_password psql -h $target_host -d $target_db -U $target_user" #source_con="psql -X -A -h $source_host -d $source_db -U $source_user -t" #trx_con="psql -X -A -h $target_host -d $target_db -U $target_user -t" #control_con="psql -X -A -h $target_host -d $target_db -U $target_user -t" control_con="psql -X -A -t postgresql://$target_user:$target_password@$target_host/$target_db?options=--search_path%3D$control_sch" trx_con="psql -X -A -t postgresql://$target_user:$target_password@$target_host/$target_db?options=--search_path%3D$target_sch" source_con="psql -X -A -t postgresql://$source_user:$source_password@$source_host/$source_db?options=--search_path%3D$source_sch" process_count=$($control_con -c "select count(process_flag) from trx_ctl_jobrun where process_flag=0;") $control_con -c "insert into trx_job_log_status(client_id,function_id,job_id) select client_id,function_id,job_id from trx_ctl_jobrun;"; while [ $process_count -gt 0 ] do min_dependant_on=$($control_con -c "select min(dependant_on) from trx_ctl_jobrun where process_flag=0 limit 1;") job_id=$($control_con -c "select job_id from trx_ctl_jobrun where dependant_on=$min_dependant_on and process_flag=0 limit 1;") source_sql=$($control_con -c "select source_sql from trx_ctl_jobrun where job_id=$job_id;") source_sql_suffix=$($control_con -c "select source_sql_suffix from trx_ctl_jobrun where job_id=$job_id;") last_run_date=$($control_con -c "select last_run_date from trx_ctl_jobrun where job_id=$job_id;") client_id=$($control_con -c "select client_id from trx_ctl_jobrun where job_id=$job_id;") function_id=$($control_con -c "select function_id from trx_ctl_jobrun where job_id=$job_id;") transactions=$($control_con -c "select transactions from trx_ctl_jobrun where job_id=$job_id;") primary_table=$($control_con -c "select primary_table from trx_ctl_jobrun where job_id=$job_id;") primary_table_id=$($control_con -c "select primary_table_id from trx_ctl_jobrun where job_id=$job_id;") primary_timestamp=$($control_con -c "select primary_table_timestamp from trx_ctl_jobrun where job_id=$job_id;") last_run_id=$($control_con -c "select last_run_id from trx_ctl_jobrun where job_id=$job_id;") target_table=$($control_con -c "select target_table from trx_ctl_jobrun where job_id=$job_id;") #transactions_to=$(expr "$last_run_id" + "$transactions") primary_table_max_id=$($source_con -c "select max($primary_table_id) from $primary_table; ") primary_table_max_time=$($source_con -c "select max($primary_timestamp) from $primary_table where $primary_table_id=$primary_table_max_id ;") #echo "---- prim $source_con -c select max($primary_timestamp) from $primary_table where $primary_table_id=$primary_table_max_id " current_timestamp=`date +'%F %T'` $control_con -c "update trx_job_log_status set job_start_time='$current_timestamp',first_run_id=$last_run_id where job_id=$job_id;" source_sql=$(echo $source_sql | sed -e "s/last_run_id/$last_run_id/g") source_sql=$(echo $source_sql | sed -e "s/last_run_date/'$last_run_date'/g") echo "------$source_sql" # for nexttime same job have few rows #echo "---- prim $source_con -c select max($primary_timestamp) from $primary_table where $primary_table_id=$primary_table_max_id " while [ $primary_table_max_id -gt $last_run_id ] do diff=$(expr "$primary_table_max_id" - "$last_run_id") if [ $diff -lt $transactions ] || [ $transactions -eq 0 ] then echo "then" run_sql="${source_sql} where $primary_table_id>$last_run_id and $primary_timestamp>='$last_run_date'" run_sql_suffix="${run_sql} ${source_sql_suffix};" $control_con -c "update trx_ctl_jobrun set last_run_id=$primary_table_max_id ,last_run_date='$primary_table_max_time' where job_id=$job_id;" last_run_id=$($control_con -c "select last_run_id from trx_ctl_jobrun where job_id=$job_id;") $source_con -c "$run_sql_suffix">$dataDir/$job_id.csv $trx_con -c "\copy $target_table from $dataDir/$job_id.csv delimiter '|' NULL AS ''" # $control_con -e "LOAD DATA LOCAL INFILE '$dataDir/$job_id.csv' INTO TABLE $control_db.$target_table FIELDS TERMINATED BY '\t'" else echo "else" transactions_to=$(expr "$last_run_id" + "$transactions") primary_table_int_max_time=$($source_con -c "select $primary_timestamp from $primary_table where $primary_table_id=$transactions_to;") run_sql="${source_sql} where $primary_table_id>$last_run_id and $primary_table_id<=$transactions_to and $primary_timestamp>='$last_run_date';" run_sql_suffix="${run_sql} ${source_sql_suffix};" $control_con -c "update trx_ctl_jobrun set last_run_id=$transactions_to,last_run_date='$primary_table_int_max_time' where job_id=$job_id;" last_run_id=$($control_con -c "select last_run_id from trx_ctl_jobrun where job_id=$job_id;") $source_con -c "$run_sql_suffix">$dataDir/$job_id.csv $trx_con -c "\copy $target_table from $dataDir/$job_id.csv delimiter '|' NULL AS ''" #echo "$control_con -c \copy $target_sch.$target_table from $dataDir/$job_id.csv delimiter '|'" #$control_con -e "LOAD DATA LOCAL INFILE '$dataDir/$job_id.csv' INTO TABLE $control_db.$target_table FIELDS TERMINATED BY '\t'" fi done current_timestamp=`date +'%F %T'` $control_con -c "update trx_job_log_status set job_end_time='$current_timestamp',last_run_id=$last_run_id,target_table='$target_table' , job_id=$job_id, duration=EXTRACT(EPOCH FROM (job_end_time-job_start_time)),rows_added=last_run_id-first_run_id where job_status=0 and job_id=$job_id" $control_con -c "update trx_job_log_status set job_status=1 where job_id=$job_id and job_status=0;" # $control_con -c "update trx_job_log_status set rows_added=last_run_id-first_run_id where job_id=$job_id;" $control_con -c "update trx_ctl_jobrun set process_flag=1 where job_id=$job_id;" process_count=$($control_con -c "select count(process_flag) from trx_ctl_jobrun where process_flag=0;") done echo "nothing"