Retail ETL jobs
This commit is contained in:
74
sql/data_rollup_v1.0.sql
Executable file
74
sql/data_rollup_v1.0.sql
Executable file
@@ -0,0 +1,74 @@
|
||||
update fw_ods.fw_ods_monthly ods
|
||||
set d4l1_date = new_date,
|
||||
d4l1_yyyy = new_year,
|
||||
d4l1_qq = new_quarter,
|
||||
d4l1_mm = new_month,
|
||||
d4l1_yyyymm = new_yyyymm::int,
|
||||
update_timestamp = now()
|
||||
from
|
||||
(
|
||||
with tbl_results as (
|
||||
with tbl as (
|
||||
select date_trunc('month', max(d4l1_date))::date max_d4l1 from fw_ods.fw_ods_monthly where client_id = 10
|
||||
)
|
||||
select (EXTRACT(year FROM age(now(),max_d4l1))*12 + EXTRACT(month FROM age(now(),max_d4l1))-1) num_month from tbl
|
||||
)
|
||||
|
||||
select distinct d4l1_date, (d4l1_date + num_month * interval '1 months')::date new_date,
|
||||
date_part('year', (d4l1_date + num_month * interval '1 months')::date) new_year,
|
||||
date_part('quarter', (d4l1_date + num_month * interval '1 months')::date) new_quarter,
|
||||
date_part('month', (d4l1_date + num_month * interval '1 months')::date) new_month,
|
||||
concat(date_part('year', (d4l1_date + num_month * interval '1 months')::date),lpad(date_part('month', (d4l1_date + num_month * interval '1 months')::date)::varchar,2,'0')) new_yyyymm
|
||||
from fw_ods.fw_ods_monthly a, tbl_results b where client_id = 10
|
||||
) new
|
||||
where ods.d4l1_date = new.d4l1_date and ods.client_id = 10;
|
||||
|
||||
/* update fw_ods_viz_data_generic data excluding waterfall data */
|
||||
|
||||
update fw_ods.fw_ods_viz_data_generic ods
|
||||
set from_date = new_date_from,
|
||||
to_date = new_date_to,
|
||||
update_timestamp = now()
|
||||
from
|
||||
(
|
||||
with tbl_results as (
|
||||
with tbl as (
|
||||
select date_trunc('month', max(from_date))::date max_d4l1 from fw_ods.fw_ods_viz_data_generic where client_id = 10 and viztype_name <> 'Waterfall'
|
||||
)
|
||||
select (EXTRACT(year FROM age(now(),max_d4l1))*12 + EXTRACT(month FROM age(now(),max_d4l1))-1) num_month from tbl
|
||||
)
|
||||
|
||||
select distinct from_date, to_date,
|
||||
(from_date + num_month * interval '1 months')::date new_date_from,
|
||||
(to_date + num_month * interval '1 months')::date new_date_to
|
||||
from fw_ods.fw_ods_viz_data_generic a, tbl_results b where client_id = 10 and viztype_name <> 'Waterfall'
|
||||
) new
|
||||
where ods.from_date = new.from_date and ods.to_date = new.to_date and ods.client_id = 10 and viztype_name <> 'Waterfall';
|
||||
|
||||
/* update waterfall data */
|
||||
|
||||
update fw_ods.fw_ods_viz_data_generic ods
|
||||
set from_date = new_date_from,
|
||||
to_date = new_date_to,
|
||||
update_timestamp = now()
|
||||
from
|
||||
(
|
||||
with tbl_results as (
|
||||
with tbl as (
|
||||
select date_trunc('month', max(from_date))::date max_d4l1 from fw_ods.fw_ods_viz_data_generic where client_id = 10 and viztype_name= 'Waterfall'
|
||||
)
|
||||
select (EXTRACT(year FROM age(now(),max_d4l1))*12 + EXTRACT(month FROM age(now(),max_d4l1))-1) num_month from tbl
|
||||
)
|
||||
select distinct from_date, to_date,
|
||||
(from_date + num_month * interval '1 months')::date new_date_from,
|
||||
(to_date + num_month * interval '1 months')::date new_date_to
|
||||
from fw_ods.fw_ods_viz_data_generic a, tbl_results b where client_id = 10 and viztype_name= 'Waterfall'
|
||||
) new
|
||||
where ods.from_date = new.from_date and ods.to_date = new.to_date and ods.client_id = 10 and viztype_name= 'Waterfall';
|
||||
|
||||
update fw_ods.fw_ods_viz_data_generic ods
|
||||
set va1 = concat(EXTRACT(year FROM from_date),'-', lpad(EXTRACT(month FROM from_date)::text,2,'0')),
|
||||
va2 = concat(EXTRACT(year FROM (from_date - 1 * interval '1 months')::date),'-', lpad(EXTRACT(month FROM (from_date - 1 * interval '1 months')::date)::text,2,'0'))
|
||||
where client_id =10 and viztype_name= 'Waterfall';
|
||||
|
||||
update fw_core.fw_jobctl_runschedule_jobstep set end_status_note ='retail_update_completed', end_status='success',end_time = now() where job_script_type = 'sql' and latest_runschedule_flag='1' and job_name= 'Retail Data Update';
|
||||
105
sql/scripts/job_load.sh
Executable file
105
sql/scripts/job_load.sh
Executable file
@@ -0,0 +1,105 @@
|
||||
#/****************************************************************
|
||||
#****** ForeWarn Licensing and / or usage Terms and Conditions *****
|
||||
#ForeWarn. The NextGen Insights Solution Platform
|
||||
|
||||
#Copyright ▒ 2021 COMPEGENCE. All Rights Reserved
|
||||
|
||||
#ForeWarn is a product of COMPEGENCE.
|
||||
|
||||
#To be used only with a valid license from COMPEGENCE
|
||||
|
||||
#www.compegence.com info@compegence.com
|
||||
#****************************************************************/
|
||||
|
||||
|
||||
#!/bin/bash -e
|
||||
#HOME="/home/compegence/customer"
|
||||
#Directories
|
||||
homeDir=$HOME/dev/retail
|
||||
configDir=$homeDir/conf
|
||||
input_file=$configDir/server_credentials.txt
|
||||
declare -A credentials
|
||||
while IFS="=" read -r key value; do
|
||||
credentials[$key]=$value
|
||||
done < <( sed -e '/^\s*$/ d' -e '/^#/ d' $input_file )
|
||||
|
||||
|
||||
user=${credentials[user]}
|
||||
password=${credentials[password]}
|
||||
host=${credentials[host]}
|
||||
db=${credentials[db]}
|
||||
protocol=${credentials[protocol]}
|
||||
server=${credentials[server]}
|
||||
|
||||
|
||||
|
||||
#let i=0
|
||||
#set -A arr
|
||||
#BASE_CODS_ARGUMENTS="--host=$cods_host --user=$cods_user --password=$cods_pwd --port=$3306"
|
||||
#BASE_ODS_ARGUMENTS="--user=$user --password=$pwd --host=$host"
|
||||
psql_base_arg="postgresql://$user:$password@$host/$db"
|
||||
#psql_base_arg="postgresql://postgres:j3(jLBq}e@localhost/ffm_k2"
|
||||
#echo $psql_base_arg
|
||||
|
||||
ifStart=`date '+%d'`
|
||||
#echo "$ifStart"
|
||||
|
||||
|
||||
#Check today is sunday. Scheduling weekly job
|
||||
today="$(date +%a)"
|
||||
sun="Mon"
|
||||
|
||||
psql $psql_base_arg -q -c "update fw_core.fw_jobctl_runschedule set latest_runschedule_flag='0' where latest_runschedule_flag='1' and client_id=$1 and function_id=$2;"
|
||||
psql $psql_base_arg -q -c "update fw_core.fw_jobctl_runschedule_jobstep set latest_runschedule_flag='0' where latest_runschedule_flag='1' and client_id=$1 and function_id=$2;"
|
||||
psql $psql_base_arg -q -c "update fw_core.fw_jobctl_file_runschedule set latest_runschedule_flag='0' where latest_runschedule_flag='1' and client_id=$1 and function_id=$2;"
|
||||
psql $psql_base_arg -q -c "update fw_core.fw_jobctl_file_sheet_runschedule set latest_runschedule_flag='0' where latest_runschedule_flag='1' and client_id=$1 and function_id=$2;"
|
||||
|
||||
#inserting into fw_core.fw_jobctl_runschedule
|
||||
chkrun=$(psql $psql_base_arg -t -c "select count(*) from fw_core.fw_jobctl_jobstep_master where client_id=$1 and function_id=$2 and run_frequency='daily' and active_flag=true;")
|
||||
if [[ $chkrun -gt 0 ]]; then
|
||||
psql $psql_base_arg -q -c "INSERT INTO fw_core.fw_jobctl_runschedule
|
||||
(client_id, function_id, run_schedule_timestamp, run_frequency, start_time, begin_status, created_by, updated_by, create_timestamp, update_timestamp)
|
||||
select client_id, function_id,now(),run_frequency,now(),'','Admin','Admin',now(),now() from fw_core.fw_jobctl_jobstep_master
|
||||
where active_flag='TRUE' group by client_id, function_id, run_frequency limit 1;"
|
||||
|
||||
#check previous day job status in fw_jobctl_runschedule
|
||||
chkrun2=$(psql $psql_base_arg -t -c "select trim(end_status) from fw_core.fw_jobctl_runschedule where run_schedule_id = (
|
||||
select max(run_schedule_id) as max from fw_core.fw_jobctl_runschedule where latest_runschedule_flag='0' and client_id=$1 and function_id=$2);")
|
||||
|
||||
if [ "$chkrun2" != " error" ]; then
|
||||
|
||||
#Update fw_jobctl_runschedule
|
||||
psql $psql_base_arg -q -c "update fw_core.fw_jobctl_runschedule set begin_status='started' where client_id=$1 and function_id=$2 and run_frequency='daily' and latest_runschedule_flag='1' "
|
||||
else
|
||||
psql $psql_base_arg -q -c "update fw_core.fw_jobctl_runschedule set begin_status='started',end_status='error',end_status_note = 'previous day job failed' where client_id=$1 and function_id=$2 and run_frequency='daily' and latest_runschedule_flag='1' "
|
||||
echo "welcome2"
|
||||
exit 1;
|
||||
fi
|
||||
else
|
||||
echo " No active job for daily for client_id=$1"
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
# looping fw_jobctl_jobstep_master for active records
|
||||
while IFS="|" read -a line
|
||||
do
|
||||
run_frequency=${line[2]}
|
||||
job_scheduling_day="${line[1]}"
|
||||
|
||||
#echo "$run_frequency"
|
||||
if [ $run_frequency == "daily" ] ; then
|
||||
echo "Populating daily job into fw_jobctl_runschedule_jobstep"
|
||||
|
||||
psql $psql_base_arg -q -c"insert into fw_core.fw_jobctl_runschedule_jobstep(client_id, function_id,latest_runschedule_flag,run_schedule_id,run_schedule_timestamp, run_frequency, job_id, step_id, job_step_run_dependency_seuqence,
|
||||
job_script_type,job_step_script_name,job_fun_param_array,job_name, step_name, job_scope, job_scope_qualifier_array, created_by,updated_by, create_timestamp, update_timestamp)
|
||||
select a.client_id, a.function_id,b.latest_runschedule_flag, b.run_schedule_id,b.run_schedule_timestamp,a.run_frequency, a.job_id, a.step_id, a.job_step_run_dependency_seuqence,
|
||||
a.job_script_type,a.job_step_script_name,a.job_fun_param_array,a.job_name,a.step_name,a.job_scope, a.job_scope_qualifier_array, 'Admin','Admin',now(),now()
|
||||
from fw_core.fw_jobctl_jobstep_master a join fw_core.fw_jobctl_runschedule b
|
||||
on a.client_id=b.client_id and a.function_id=b.function_id and a.run_frequency=b.run_frequency
|
||||
where latest_runschedule_flag='1' and b.begin_status = 'started' and b.end_status is null and a.active_flag='true' and a.run_frequency='daily'
|
||||
order by a.client_id,a.function_id,a.job_id,a.step_id,a.job_step_run_dependency_seuqence;"
|
||||
|
||||
fi
|
||||
done < <(psql $psql_base_arg -At -c"select client_id,function_id,run_frequency from fw_core.fw_jobctl_jobstep_master where client_id=$1 and function_id=$2 order by run_frequency ")
|
||||
|
||||
|
||||
176
sql/scripts/run_schedule.sh
Executable file
176
sql/scripts/run_schedule.sh
Executable file
@@ -0,0 +1,176 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
#/****************************************************************
|
||||
#****** ForeWarn Licensing and / or usage Terms and Conditions *****
|
||||
#ForeWarn. The NextGen Insights Solution Platform
|
||||
|
||||
#Copyright ▒ 2021 COMPEGENCE. All Rights Reserved
|
||||
|
||||
#ForeWarn is a product of COMPEGENCE.
|
||||
|
||||
#To be used only with a valid license from COMPEGENCE
|
||||
|
||||
#www.compegence.com info@compegence.com
|
||||
#****************************************************************/
|
||||
|
||||
#HOME="/home/compegence/customer"
|
||||
#Directories
|
||||
homeDir=$HOME/dev/retail
|
||||
configDir=$homeDir/conf
|
||||
input_file=$configDir/server_credentials.txt
|
||||
declare -A credentials
|
||||
while IFS="=" read -r key value; do
|
||||
credentials[$key]=$value
|
||||
done < <( sed -e '/^\s*$/ d' -e '/^#/ d' $input_file )
|
||||
|
||||
|
||||
user=${credentials[user]}
|
||||
password=${credentials[password]}
|
||||
host=${credentials[host]}
|
||||
db=${credentials[db]}
|
||||
protocol=${credentials[protocol]}
|
||||
server=${credentials[server]}
|
||||
|
||||
|
||||
|
||||
psql_base_arg="postgresql://$user:$password@$host/$db"
|
||||
|
||||
echo "first arg: $1"
|
||||
echo "second arg: $2"
|
||||
|
||||
# load the record into the fw_jobctl_runschedule accounting for run frequency (hourly, daily, monthly, yearly) and insert
|
||||
$homeDir/sql/scripts/job_load.sh $1 $2
|
||||
|
||||
##***** Run frequency loop; Outer Loop
|
||||
#while loop for each client_id,function_id,run_frequency from fw_jobctl_runschedule_jobstep
|
||||
|
||||
while IFS="|" read -a outer
|
||||
do
|
||||
client_id=${outer[0]}
|
||||
function_id=${outer[1]}
|
||||
run_frequency=${outer[2]}
|
||||
|
||||
#Update start_time for fw_jobctl_runschedule
|
||||
#psql $psql_base_arg -q -c "update fw_core.fw_jobctl_runschedule set start_time=now(),begin_status='started' where latest_runschedule_flag='1' and client_id=$client_id and function_id=$function_id and run_frequency='$run_frequency' and end_status is null "
|
||||
|
||||
##** Job and Step Loop; innner loop;
|
||||
#while loop for each job & step
|
||||
##** Job and Step Loop; innner loop; based on the entries on the table fw_jobctl_runschedule_jobstep
|
||||
|
||||
job_status=0
|
||||
prev_job_seq=''
|
||||
file_count=0
|
||||
while IFS="|" read -a inner
|
||||
do
|
||||
client_id=${inner[0]}
|
||||
function_id=${inner[1]}
|
||||
run_frequency=${inner[2]}
|
||||
job_id=${inner[4]}
|
||||
step_id=${inner[5]}
|
||||
job_name=${inner[6]}
|
||||
#step_name=(${inner[7]})
|
||||
job_script_type=${inner[8]}
|
||||
job_step_script_name=${inner[9]}
|
||||
#remove first and last char - curly braces
|
||||
job_fun_param_array=${inner[10]:1:-1}
|
||||
job_step_run_dependency_seuqence=${inner[11]}
|
||||
|
||||
param=''
|
||||
var1=' '
|
||||
job_step_script_name_concat=$job_step_script_name'('$client_id,$function_id,$job_id,$step_id')'
|
||||
|
||||
#reading array column and spliting each var and appending $ & space
|
||||
IFS=',' read -r -a ADDR <<< "$job_fun_param"
|
||||
for i in "${ADDR[@]}"; do
|
||||
param=$param$i$var1
|
||||
done
|
||||
|
||||
|
||||
|
||||
##get file count
|
||||
file_count=$(psql $psql_base_arg -t -c "select object_count from fw_core.fw_jobctl_runschedule where latest_runschedule_flag='1';")
|
||||
echo "file-count is $file_count"
|
||||
|
||||
echo "Executing Job: $job_name by checking Previous End Status"
|
||||
## Process java Program
|
||||
if [[ $job_script_type == "java" ]]; then
|
||||
psql $psql_base_arg -t -q -c "update fw_core.fw_jobctl_runschedule_jobstep set start_time=now(),begin_status='started' where job_step_script_name='$job_step_script_name' and client_id=$client_id and function_id=$function_id and run_frequency='$run_frequency' and latest_runschedule_flag='1' "
|
||||
echo "--$prev_job_seq"
|
||||
if [[ $prev_job_seq == '' ]]; then
|
||||
echo "calling java jar"
|
||||
java -jar $job_step_script_name
|
||||
|
||||
else
|
||||
chkrun=$(psql $psql_base_arg -t -c "select TRIM(end_status) from fw_core.fw_jobctl_runschedule_jobstep where latest_runschedule_flag='1' and job_step_run_dependency_seuqence=$prev_job_seq and client_id=$1 and function_id=$2;")
|
||||
echo "chk $chkrun"
|
||||
if [[ "$chkrun" == " success" ]]; then
|
||||
java -jar $job_step_script_name
|
||||
else
|
||||
exit 1;
|
||||
fi
|
||||
fi
|
||||
|
||||
## Process sql files
|
||||
elif [[ $job_script_type == "sql" ]] ; then
|
||||
echo "enter into sql"
|
||||
psql $psql_base_arg -t -q -c "update fw_core.fw_jobctl_runschedule_jobstep set start_time=now(),begin_status='started' where job_step_script_name='$job_step_script_name' and client_id=$client_id and function_id=$function_id and run_frequency='$run_frequency' and latest_runschedule_flag='1' "
|
||||
if [[ $prev_job_seq == '' ]]; then
|
||||
echo "enter into sql1"
|
||||
|
||||
psql $psql_base_arg -a -f $job_step_script_name $host $port $user $pwd $dbname $param $client_id $function_id $data_from_date $data_to_date $job_scope $job_scope_qualifier
|
||||
else
|
||||
chkrun=$(psql $psql_base_arg -t -c "select TRIM(end_status) from fw_core.fw_jobctl_runschedule_jobstep where latest_runschedule_flag='1' and job_step_run_dependency_seuqence=$prev_job_seq and client_id=$1 and function_id=$2;")
|
||||
if [[ "$chkrun" == " success" ]]; then
|
||||
echo "enter into sql2"
|
||||
|
||||
psql $psql_base_arg -a -f $job_step_script_name $host $port $user $pwd $dbname $param $client_id $function_id $data_from_date $data_to_date $job_scope $job_scope_qualifier
|
||||
else
|
||||
exit 1;
|
||||
fi
|
||||
fi
|
||||
|
||||
## Process shell_script Program
|
||||
elif [[ $job_script_type == "shell_script" ]] ; then
|
||||
psql $psql_base_arg -t -q -c "update fw_core.fw_jobctl_runschedule_jobstep set start_time=now(),begin_status='started' where job_step_script_name='$job_step_script_name' and client_id=$client_id and function_id=$function_id and run_frequency='$run_frequency' and latest_runschedule_flag='1' "
|
||||
if [[ $prev_job_seq == '' ]]; then
|
||||
$job_step_script_name
|
||||
else
|
||||
chkrun=$(psql $psql_base_arg -t -c "select TRIM(end_status) from fw_core.fw_jobctl_runschedule_jobstep where latest_runschedule_flag='1' and job_step_run_dependency_seuqence=$prev_job_seq and client_id=$1 and function_id=$2;")
|
||||
|
||||
if [[ "$chkrun" == " success" && $file_count -gt 0 ]]; then
|
||||
$job_step_script_name
|
||||
else
|
||||
exit 1;
|
||||
fi
|
||||
fi
|
||||
|
||||
## Process sql fn Program
|
||||
elif [[ $job_script_type == "sql_fn" ]] ; then
|
||||
psql $psql_base_arg -t -q -c "update fw_core.fw_jobctl_runschedule_jobstep set start_time=now(),begin_status='started' where job_step_script_name='$job_step_script_name' and client_id=$client_id and function_id=$function_id and run_frequency='$run_frequency' and latest_runschedule_flag='1' "
|
||||
if [[ $prev_job_seq == '' ]]; then
|
||||
psql $psql_base_arg -c "select $job_step_script_name_concat"
|
||||
else
|
||||
|
||||
chkrun=$(psql $psql_base_arg -t -c "select TRIM(end_status) from fw_core.fw_jobctl_runschedule_jobstep where latest_runschedule_flag='1' and job_step_run_dependency_seuqence=$prev_job_seq and client_id=$1 and function_id=$2;")
|
||||
if [[ "$chkrun" == " success" && $file_count -gt 0 ]]; then
|
||||
psql $psql_base_arg -t -c "select $job_step_script_name_concat"
|
||||
else
|
||||
exit 1;
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
prev_job_seq=$job_step_run_dependency_seuqence
|
||||
## End of While Loop
|
||||
#2nd looping through fw_core.fw_jobctl_runschedule_jobstep for each client_id,function_id,run_frequency,run_schedule_id1 is completed
|
||||
done < <(psql $psql_base_arg -At -c"select client_id,function_id,run_frequency,run_schedule_id,job_id,step_id,job_name,step_name,job_script_type,job_step_script_name,job_fun_param_array,job_step_run_dependency_seuqence from fw_core.fw_jobctl_runschedule_jobstep where latest_runschedule_flag='1' and client_id=$client_id and function_id=$function_id and run_frequency='$run_frequency' order by client_id,function_id,run_schedule_id,job_id,step_id,job_step_run_dependency_seuqence; ")
|
||||
|
||||
|
||||
#Update end_time for fw_jobctl_runschedule
|
||||
psql $psql_base_arg -q -c "update fw_core.fw_jobctl_runschedule set end_status='success',end_status_note='run_completed' ,latest_runschedule_flag='0',end_time=now() where client_id=$client_id and function_id=$function_id and run_frequency='$run_frequency' and latest_runschedule_flag='1' "
|
||||
|
||||
#1st loop for client_id,function_id,run_frequency
|
||||
|
||||
done < <(psql $psql_base_arg -At -c"select client_id,function_id,run_frequency from fw_core.fw_jobctl_runschedule_jobstep where client_id=$1 and function_id=$2 and latest_runschedule_flag='1' order by client_id,function_id,run_frequency ")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user