Notes
Notes - notes.io |
df_tp_ro_operacionessbs_unicas_01 = ro_7_df
.select(
col("numregistroopebcp"),
col("codmes"),
col("codsecuencial"),
col("codclavectaordenante"),
col("codcanal"),
col("descanal"),
col("codclaveunicoclibeneficiario"),
col("codclaveunicocliordenante"),
col("codclaveunicoclisolicitante"),
col("codclavectabeneficiario"),
col("codclavectasolicitante"),
col("codinternocomputacionalbeneficiario"),
col("codinternocomputacionalordenante"),
col("codinternocomputacionalsolicitante"),
col("tiprolclibeneficiario"),
col("tiprolcliordenante"),
col("tiprolclisolicitante"),
col("codclavepartyclibeneficiario"),
col("codclavepartycliordenante"),
col("codclavepartyclisolicitante"),
col("codmoneda"),
col("codubigeo"),
col("codappros"),
col("codtrxbcp"),
col("fecdia"),
col("flgtrxcompensacionelectronica"),
col("flgtrxinternacional"),
col("flgumbraltrx"),
col("hortrx"),
col("mtocambiodolarros"),
col("mtotrx"),
col("mtotrxdol"),
col("mtodestino"),
col("numtrxapp"),
col("tipopereportesbs"),
col("codmonedadestino"),
col("codcargoclibeneficiario"),
col("codcargocliordenante"),
col("codcargoclisolicitante"),
col("codclavepartycliperbeneficiario"),
col("codclavepartycliperordenante"),
col("codclavepartyclipersolicitante"),
col("coddistritobeneficiario"),
col("coddistritoordenante"),
col("coddistritosolicitante"),
col("codpaisdestinosbs"),
col("codpaisemisorsbs"),
col("codswiftbcoexterioremisor"),
col("codswiftbcoexteriordestino"),
col("numcheque"),
col("nbrocupacionclibeneficiario"),
col("nbrocupacioncliordenante"),
col("nbrocupacionclisolicitante"),
col("flgumbralbeneficiario"),
col("flgumbralordenante"),
col("flgumbralsolicitante"),
col("codclavepartyage"),
col("tipclibeneficiario"),
col("tipcliordenante"),
col("tipclisolicitante"),
col("tippartyidentificacionbeneficiario"),
col("tippartyidentificacionordenante"),
col("tippartyidentificacionsolicitante"),
col("tipindicadorlavadoactivo"),
col("codpais_beneficiario"),
col("codpais_ordenante"),
col("codpais_solicitante"),
col("mtotrx_sbs"),
col("codmoneda_sbs"),
col("mtotrx_sbs_dol"),
col("mtocambioalnuevosol"),
col("codprofesion_beneficiario"),
col("codprofesion_ordenante"),
col("codprofesion_solicitante"),
col("codsucage"),
col("flgtrxefectivo"),
col("flgresidentesolicitante"),
col("flgresidenteordenante"),
col("flgresidentebeneficiario"),
col("tiprelpersonabcosolicitante"),
col("tiprelpersonabcoordenante"),
col("tiprelpersonabcobeneficiario"),
col("desorigendinero"),
lit("0").alias("m_tipmancomuno"),
lit("1").alias("m_origenro"),
lit("0").alias("m_codclavecta"),
lit(0).alias("m_numcodclavepartycli_o"),
lit(0).alias("m_numcodclavepartycli_b"),
lit(0).alias("m_codclavepartycli_ori"),
lit('').alias("m_codclaveunicocli_ori"),
col("codbcodestinotrx")
)
return df_tp_ro_operacionessbs_unicas_01
def __gen_ro_base_cuenta(self, ro_operacionessbs_unicas_01_df):
df_tp_ro_base_cuenta = (ro_operacionessbs_unicas_01_df.filter((col("codclavectaordenante").isNotNull()))
.select(
col("codclavectaordenante").alias("codclavecta"),
col("codappros")
).union(
ro_operacionessbs_unicas_01_df.filter((col("codclavectabeneficiario").isNotNull()))
.select(
col("codclavectabeneficiario").alias("codclavecta"),
col("codappros")
)
)).dropDuplicates(["codclavecta", "codappros"])
return df_tp_ro_base_cuenta
def __join_basecuenta_relcuentafinancieracli(self, ro_basecuenta_df, relcuentafinancieracli_df):
window_spec = Window.partitionBy(trim(col("b.codclavecta"))).orderBy(col("b.codclavecta"))
df_tp_ro_base_relacion = (ro_basecuenta_df.alias("a")
.join(relcuentafinancieracli_df.alias("b"),
col("a.codclavecta") == col("b.codclavecta"), "inner")
.select(
when(col("tiprelctafinancieracli") == "TIT", lit(self.S_CONS)).otherwise(lit(self.N_CONS)).alias(
"flgtitular"),
col("b.codclavepartycli"),
col("b.tiprolcli"),
col("a.codclavecta"),
row_number().over(window_spec).alias("rn"))
.dropDuplicates(["flgtitular", "codclavepartycli", "tiprolcli", "codclavecta"]))
return df_tp_ro_base_relacion
def __filter_ro_base_relacion(self, ro_base_relacion_df):
df_tp_ro_base_mancomuno = ro_base_relacion_df.groupBy(col("codclavecta")).agg(
count(lit(1)).alias("nummancomuno"))
.filter(col("nummancomuno") > 1)
return df_tp_ro_base_mancomuno
def __join_ro_base_relacion_base_mancomuno(self, ro_base_relacion_df, ro_base_mancomuno_df):
df_tp_ro_relacion_mancomuno = (ro_base_relacion_df.alias("a")
.join(ro_base_mancomuno_df.alias("b"), col("a.codclavecta") == col("b.codclavecta"), "inner")
.select(
col("a.codclavecta"),
col("a.codclavepartycli"),
col("a.tiprolcli"),
col("a.flgtitular"),
col("b.nummancomuno"),
lit(' ').alias("codstatus")))
return df_tp_ro_relacion_mancomuno
def __gen_base_codclaveparty_clientes(self, ro_operacionessbs_unicas_01_df, ro_relacion_mancomuno_df):
df_tp_ro_codclavepartycli_cliente = (ro_operacionessbs_unicas_01_df
.select(
col("codclavepartyclisolicitante").alias("codclavepartycli"),
col("tiprolclisolicitante").alias("tiprolcli"))
.union(
ro_operacionessbs_unicas_01_df.select(col("codclavepartycliordenante").alias("codclavepartycli"),
col("tiprolcliordenante").alias("tiprolcli")))
.union(
ro_operacionessbs_unicas_01_df.select(col("codclavepartyclibeneficiario").alias("codclavepartycli"),
col("tiprolclibeneficiario").alias("tiprolcli")))
.union(
ro_relacion_mancomuno_df.select(col("codclavepartycli"), col("tiprolcli")))).dropDuplicates(
["codclavepartycli", "tiprolcli"])
return df_tp_ro_codclavepartycli_cliente
def __join_ro_operacionessbs_unicas_relacion_mancomuno(self, ro_operacionessbs_unicas_01_df,
ro_relacion_mancomuno_df):
df_tp_ro_rel_mancomunos = (ro_operacionessbs_unicas_01_df.alias("a")
.join(ro_relacion_mancomuno_df.alias("b"),
(col("a.codclavectaordenante") == col("b.codclavecta")) &
(col("a.codclavepartycliordenante") != col("codclavepartycli")) &
(trim(col("a.codclavepartycliordenante")) != "") &
(trim(col("a.codclavepartycliordenante")) != ".") &
(col("a.codclavepartycliordenante").isNotNull()) &
(col("a.codclavectaordenante").isNotNull()) &
(trim(col("a.codclavectaordenante")) != "") &
(trim(col("a.codclavectaordenante")) != "."), "inner")
.select(
lit("2").alias("tipmancomuno"),
lit("2").alias("origenro"),
col("a.numregistroopebcp"),
col("b.codclavepartycli"),
col("b.tiprolcli"),
col("a.codclavectaordenante").alias("codclavecta")).union(
ro_operacionessbs_unicas_01_df.alias("a")
.join(ro_relacion_mancomuno_df.alias("b"),
(col("a.codclavectabeneficiario") == col("b.codclavecta")) &
(col("a.codclavepartyclibeneficiario") != col("codclavepartycli")) &
(trim(col("a.codclavepartyclibeneficiario")) != "") &
(trim(col("a.codclavepartyclibeneficiario")) != ".") &
(col("a.codclavepartyclibeneficiario").isNotNull()) &
(col("a.codclavectabeneficiario").isNotNull()) &
(trim(col("a.codclavectabeneficiario")) != "") &
(trim(col("a.codclavectabeneficiario")) != "."), "inner")
.select(
lit("3").alias("tipmancomuno"),
lit("2").alias("origenro"),
col("a.numregistroopebcp"),
col("b.codclavepartycli"),
col("b.tiprolcli"),
col("a.codclavectabeneficiario").alias("codclavecta"))))
return df_tp_ro_rel_mancomunos
def __gen_ro_nummancomunos(self, ro_rel_mancomunos_df):
df_tp_ro_nummancomunos = (
ro_rel_mancomunos_df.dropDuplicates(["numregistroopebcp", "tipmancomuno", "codclavepartycli"])
.groupBy(col("numregistroopebcp"), col("tipmancomuno"))
.agg(
count(lit(1)).alias("numcic")))
Utils.logger_info(self, f"-> Inicio 1.1 escritura.")
ro_rel_mancomunos_df.write.mode("overwrite").format("parquet").save(
self.ruta + "TMP_SEGMENTACIONOPERSOSPECHOSA_01_1/")
df_1 = self.spark.read.format("parquet").load(self.ruta + "TMP_SEGMENTACIONOPERSOSPECHOSA_01_1/")
Utils.logger_info(self, f"-> Fin 1.1 escritura.")
return df_1
def __gen_ro_res_nummancomunos(self, ro_nummancomunos_df):
df_tp_ro_res_nummancomunos = (ro_nummancomunos_df
.groupBy("numregistroopebcp")
.agg(
sum(
when(col("tipmancomuno") == "2", col("numcic")).otherwise(lit(0))
).alias("numcic_o"),
sum(
when(col("tipmancomuno") == "3", col("numcic")).otherwise(lit(0))
).alias("numcic_b"))
.filter((col("numcic_o") > 0) | (col("numcic_b") > 0)))
return df_tp_ro_res_nummancomunos
def __gen_ro_base_fil_nummancomunos(self, ro_rel_mancomunos_df, ro_res_nummancomunos_df, cliente_df):
df_tp_ro_base_fil_nummancomunos = (ro_rel_mancomunos_df.alias("a")
.join(ro_res_nummancomunos_df.alias("b"), col("a.numregistroopebcp") == col("b.numregistroopebcp"), "inner")
.join(cliente_df.alias("c"), (col("a.codclavepartycli") == col("c.codclavepartycli")) &
(col("a.tiprolcli") == col("c.tiprolcli")), "left")
.select(
col("a.tipmancomuno"),
col("a.origenro"),
col("a.numregistroopebcp"),
col("a.codclavepartycli"),
col("a.tiprolcli"),
col("c.codclaveunicocli"),
col("a.codclavecta"),
col("b.numcic_o"),
col("b.numcic_b")))
return df_tp_ro_base_fil_nummancomunos
def __gen_ro_operacionessbs_unicas_02(self, ro_operacionessbs_unicas_01_df, ro_base_fil_nummancomunos_df):
df_tp_ro_operacionessbs_unicas_02 = (ro_operacionessbs_unicas_01_df.alias("a")
.join(ro_base_fil_nummancomunos_df.alias("b"), col("a.numregistroopebcp") == col("b.numregistroopebcp"),
"inner")
.select(
col("a.numregistroopebcp"),
col("a.codmes"),
col("a.codsecuencial"),
col("a.codclavectaordenante"),
col("a.codcanal"),
col("a.descanal"),
when(col("b.tipmancomuno") == '3', col("b.codclaveunicocli")).otherwise(
col("a.codclaveunicoclibeneficiario")).alias("codclaveunicoclibeneficiario"),
when(col("b.tipmancomuno") == '2', col("b.codclaveunicocli")).otherwise(
col("a.codclaveunicocliordenante")).alias("codclaveunicocliordenante"),
col("a.codclaveunicoclisolicitante"),
col("a.codclavectabeneficiario"),
col("a.codclavectasolicitante"),
col("a.codinternocomputacionalbeneficiario"),
col("a.codinternocomputacionalordenante"),
col("a.codinternocomputacionalsolicitante"),
col("a.tiprolclibeneficiario"),
col("a.tiprolcliordenante"),
col("a.tiprolclisolicitante"),
when(col("b.tipmancomuno") == '3', col("b.codclavepartycli")).otherwise(
col("a.codclavepartyclibeneficiario")).alias("codclavepartyclibeneficiario"),
when(col("b.tipmancomuno") == '2', col("b.codclavepartycli")).otherwise(
col("a.codclavepartycliordenante")).alias("codclavepartycliordenante"),
col("a.codclavepartyclisolicitante"),
col("a.codmoneda"),
col("a.codubigeo"),
col("a.codappros"),
col("a.codtrxbcp"),
col("a.fecdia"),
col("a.flgtrxcompensacionelectronica"),
col("a.flgtrxinternacional"),
lit(".").alias("flgumbraltrx"),
col("a.hortrx"),
col("a.mtocambiodolarros"),
col("a.mtotrx"),
col("a.mtotrxdol"),
col("a.mtodestino"),
col("a.numtrxapp"),
col("a.tipopereportesbs"),
col("a.codmonedadestino"),
col("a.codcargoclibeneficiario"),
col("a.codcargocliordenante"),
col("a.codcargoclisolicitante"),
col("a.codclavepartycliperbeneficiario"),
col("a.codclavepartycliperordenante"),
col("a.codclavepartyclipersolicitante"),
col("a.coddistritobeneficiario"),
col("a.coddistritoordenante"),
col("a.coddistritosolicitante"),
col("a.codpaisdestinosbs"),
col("a.codpaisemisorsbs"),
col("a.codswiftbcoexterioremisor"),
col("a.codswiftbcoexteriordestino"),
col("a.numcheque"),
col("a.nbrocupacionclibeneficiario"),
col("a.nbrocupacioncliordenante"),
col("a.nbrocupacionclisolicitante"),
lit(".").alias("flgumbralbeneficiario"),
lit(".").alias("flgumbralordenante"),
lit(".").alias("flgumbralsolicitante"),
col("a.codclavepartyage"),
col("a.tipclibeneficiario"),
col("a.tipcliordenante"),
col("a.tipclisolicitante"),
col("a.tippartyidentificacionbeneficiario"),
col("a.tippartyidentificacionordenante"),
col("a.tippartyidentificacionsolicitante"),
col("a.tipindicadorlavadoactivo"),
col("a.codpais_beneficiario"),
col("a.codpais_ordenante"),
col("a.codpais_solicitante"),
col("a.mtotrx_sbs"),
col("a.codmoneda_sbs"),
col("a.mtotrx_sbs_dol"),
col("a.mtocambioalnuevosol"),
col("a.codprofesion_beneficiario"),
col("a.codprofesion_ordenante"),
col("a.codprofesion_solicitante"),
col("a.codsucage"),
col("a.flgtrxefectivo"),
col("a.flgresidentesolicitante"),
col("a.flgresidenteordenante"),
col("a.flgresidentebeneficiario"),
col("a.tiprelpersonabcosolicitante"),
col("a.tiprelpersonabcoordenante"),
col("a.tiprelpersonabcobeneficiario"),
col("a.desorigendinero"),
col("b.tipmancomuno").alias("m_tipmancomuno"),
col("b.origenro").alias("m_origenro"),
col("b.codclavecta").alias("m_codclavecta"),
col("b.numcic_o").alias("m_numcodclavepartycli_o"),
col("b.numcic_b").alias("m_numcodclavepartycli_b"),
when(col("b.tipmancomuno") == '2', col("a.codclavepartycliordenante"))
.when(col("b.tipmancomuno") == '3', col("a.codclavepartyclibeneficiario")).alias(
"m_codclavepartycli_ori"),
when(col("b.tipmancomuno") == '2', col("a.codclaveunicocliordenante"))
.when(col("b.tipmancomuno") == '3', col("a.codclaveunicoclibeneficiario")).alias(
"m_codclaveunicocli_ori"),
col("a.codbcodestinotrx")))
return df_tp_ro_operacionessbs_unicas_02
def __union_ro_operacionessbs_unicas(self, ro_operacionessbs_unicas_01_df, ro_operacionessbs_unicas_02_df):
df_tp_ro_operacionessbs_unicas = ro_operacionessbs_unicas_01_df.union(ro_operacionessbs_unicas_02_df)
Utils.logger_info(self, f"-> Inicio segunda escritura.")
df_tp_ro_operacionessbs_unicas.write.mode("overwrite").format("parquet").save(
self.ruta + "TMP_SEGMENTACIONOPERSOSPECHOSA_02/")
df_1 = self.spark.read.format("parquet").load(self.ruta + "TMP_SEGMENTACIONOPERSOSPECHOSA_02/")
Utils.logger_info(self, f"-> Fin segunda escritura.")
return df_1
def __gen_ro_operacionessbs_unido(self, ro_operacionessbs_unicas_df, me_tipooperacionsospechosa_df, clipaisprof_df):
df_tp_ro_operacionessbs_unido = (ro_operacionessbs_unicas_df.alias("a")
.join(broadcast(me_tipooperacionsospechosa_df).alias("b"),
col("a.tipopereportesbs") == col("b.tipoperacionrossbs").cast("int"),
"left")
.join(clipaisprof_df.alias("c"),
col("a.codclavepartyclibeneficiario") == col("c.codclaveparty"), "left")
.join(clipaisprof_df.alias("d"),
col("a.codclavepartycliordenante") == col("d.codclaveparty"), "left")
.join(clipaisprof_df.alias("e"),
col("a.codclavepartyclisolicitante") == col("e.codclaveparty"), "left")
.select(
*[col(f"a.{c}") for c in ro_operacionessbs_unicas_df.columns if
c not in ["flgresidentebeneficiario", "flgresidenteordenante", "flgresidentesolicitante"]],
col("b.tipctaorigenrossbs").alias("tipctaordinario"),
when((col("a.codappros") == 'BSIO') & (col("a.tipopereportesbs") == "99"), lit("A")).otherwise(
col("b.tipcargoabono")
).alias("tipcargoabono"),
when((col("a.codappros").isin('BSIO', 'NHBK', 'NHB')) & (col("a.tipopereportesbs") == "99"),
lit("3")).otherwise(
col("b.tipctadestinorossbs")
).alias("tipctabeneficiario"),
lit('002').alias("codbco_ord"),
when(col("a.tipopereportesbs") == "25", col("a.codbcodestinotrx")).otherwise(lit('002')).alias(
"codbco_ben"),
when(col("c.codclaveparty").isNotNull(), col("c.flgresidencia")).otherwise(
col("flgresidentebeneficiario")).alias("flgresidentebeneficiario"),
when(col("d.codclaveparty").isNotNull(), col("d.flgresidencia")).otherwise(
col("flgresidenteordenante")).alias("flgresidenteordenante"),
when(col("e.codclaveparty").isNotNull(), col("e.flgresidencia")).otherwise(
col("flgresidentesolicitante")).alias("flgresidentesolicitante")
).dropDuplicates())
return df_tp_ro_operacionessbs_unido
def __gen_clientes_operaciones(self, ro_operacionessbs_unido_df):
df_clientesolicitante = ro_operacionessbs_unido_df.select(
col("codclavepartyclisolicitante").alias("codclavepartycli"),
col("tiprolclisolicitante").alias("tiprolcli"))
df_clienteordenante = ro_operacionessbs_unido_df.select(
col("codclavepartycliordenante").alias("codclavepartycli"), col("tiprolcliordenante").alias("tiprolcli"))
df_clientebeneficiario = ro_operacionessbs_unido_df.select(
col("codclavepartyclibeneficiario").alias("codclavepartycli"),
col("tiprolclibeneficiario").alias("tiprolcli"))
df_clienteoperaciones = df_clientesolicitante.union(df_clienteordenante).union(
df_clientebeneficiario).dropDuplicates()
return df_clienteoperaciones
def __join_clienteoperaciones_partydac(self, clienteoperaciones_df, partydac_df):
df_clienteoperaciones_nit = (clienteoperaciones_df.alias("a")
.join(partydac_df.alias("b"), (col("a.codclavepartycli") == col("b.codclaveparty")), "left")
.select(
col("a.codclavepartycli"),
col("a.tiprolcli"),
col("b.codnit").cast("string").alias("codnit")))
return df_clienteoperaciones_nit
def __gen_clienteoperaciones_dir(self, clientesolicitante_df, partydireccion_df):
window_spec = Window.partitionBy("b.coddistrito").orderBy(length(col("desdir")).desc())
df_clienteoperaciones_dir_01 = (clientesolicitante_df.alias("a")
.join(partydireccion_df.alias("b"),
(col("a.codclavepartycli") == col("b.codclaveparty")) & (
col("a.tiprolcli") == col("b.tiprol")), "inner")
.select(
col("a.codclavepartycli"),
col("a.tiprolcli"),
col("b.codprovincia"),
col("b.coddepartamento"),
col("b.coddistrito"),
row_number().over(window_spec).alias("rn"))
.filter(col("rn") == 1))
return df_clienteoperaciones_dir_01
def __select_ubicaciones_clientes(self, clienteoperaciones_dir_df):
df_ubicaciones = clienteoperaciones_dir_df.select(col("codprovincia"), col("coddepartamento"),
col("coddistrito"))
.dropDuplicates(["codprovincia", "coddepartamento", "coddistrito"])
return df_ubicaciones
def __gen_ro_1_gora(self, ro_operacionessbs_unido_df, clienteoperaciones_nit_df, ubicaciones_df,
relacionoficinabcpoperacionsospechosa_df, descanal_df, descanalros_df,
relacionbancoentidadoperacionsospechosa_df,
relaciondocumentobcpoperacionsospechosa_df,bancoexterior_uni_df):
df_tp_ro_1_gora = (ro_operacionessbs_unido_df.alias("a")
.join(broadcast(relaciondocumentobcpoperacionsospechosa_df).alias("a1"),
col("a.tippartyidentificacionbeneficiario") == col("a1.tippartyidentificacion"), "left")
.join(broadcast(relaciondocumentobcpoperacionsospechosa_df).alias("a2"),
col("a.tippartyidentificacionordenante") == col("a2.tippartyidentificacion"), "left")
.join(broadcast(relaciondocumentobcpoperacionsospechosa_df).alias("a3"),
col("a.tippartyidentificacionsolicitante") == col("a3.tippartyidentificacion"), "left")
.join(clienteoperaciones_nit_df.alias("z"),
(col("a.codclavepartyclibeneficiario") == col("z.codclavepartycli")) & (
col("a.tiprolclibeneficiario") == col("z.tiprolcli")), "left")
.join(clienteoperaciones_nit_df.alias("y"),
(col("a.codclavepartycliordenante") == col("y.codclavepartycli")) & (
col("a.tiprolcliordenante") == col("y.tiprolcli")), "left")
.join(clienteoperaciones_nit_df.alias("x"),
(col("a.codclavepartyclisolicitante") == col("x.codclavepartycli")) & (
col("a.tiprolclisolicitante") == col("x.tiprolcli")), "left")
.join(broadcast(ubicaciones_df).alias("b"), col("a.coddistritosolicitante") == col("b.coddistrito"), "left")
.join(broadcast(ubicaciones_df).alias("c"), col("a.coddistritoordenante") == col("c.coddistrito"), "left")
.join(broadcast(ubicaciones_df).alias("d"), col("a.coddistritobeneficiario") == col("d.coddistrito"),
"left")
.join(broadcast(relacionoficinabcpoperacionsospechosa_df).alias("e"),
col("a.codsucage") == col("e.codsucage"), "left")
.join(broadcast(descanal_df).alias("f"), col("a.codcanal") == col("f.codcanal"), "left")
.join(broadcast(descanalros_df).alias("f1"), col("a.codcanal") == col("f1.codcanal"), "left")
.join(broadcast(relacionbancoentidadoperacionsospechosa_df).alias("g"),
col("a.codbco_ord") == col("g.codbcobcp"), "left")
.join(broadcast(relacionbancoentidadoperacionsospechosa_df).alias("h"),
col("a.codbco_ben") == col("h.codbcobcp"), "left")
.join(bancoexterior_uni_df.alias("i"),
col("a.codswiftbcoexterioremisor") == col("i.codswiftbcoexterior"), "left")
.join(bancoexterior_uni_df.alias("j"),
col("a.codswiftbcoexteriordestino") == col("j.codswiftbcoexterior"), "left")
.select(
col("e.codofisbs"),
col("a.numregistroopebcp"),
col("a.codmes"),
col("a.codsecuencial"),
col("a.codclavectaordenante"),
col("a.codcanal"),
col("a.descanal"),
col("z.codnit").alias("codnit_beneficiario"),
col("y.codnit").alias("codnit_ordenante"),
col("x.codnit").alias("codnit_solicitante"),
col("a.codclaveunicoclibeneficiario"),
col("a.codclaveunicocliordenante"),
col("a.codclaveunicoclisolicitante"),
col("a.codclavectabeneficiario"),
col("a.codclavectasolicitante"),
col("a.codinternocomputacionalbeneficiario"),
col("a.codinternocomputacionalordenante"),
col("a.codinternocomputacionalsolicitante"),
col("a.tiprolclibeneficiario"),
col("a.tiprolcliordenante"),
col("a.tiprolclisolicitante"),
col("a.codclavepartyclibeneficiario"),
col("a.codclavepartycliordenante"),
col("a.codclavepartyclisolicitante"),
col("a.codmoneda"),
coalesce(col("a.codubigeo"), lit('150114')).alias("codubigeo"),
col("a.codappros"),
col("a.codtrxbcp"),
col("a.fecdia"),
col("a.flgtrxcompensacionelectronica"),
when(col("a.flgtrxcompensacionelectronica") == self.S_CONS, lit("1"))
.when(col("a.flgtrxcompensacionelectronica") == self.N_CONS, lit("2"))
.otherwise(col("a.flgtrxcompensacionelectronica")).alias("flg_cce_intermediario"),
col("a.flgtrxinternacional"),
when(col("a.flgtrxinternacional") == self.S_CONS, lit("2"))
.when(col("a.flgtrxinternacional") == self.N_CONS, lit("1"))
.otherwise(col("a.flgtrxinternacional")).alias("flg_internacional_alcance"),
col("a.flgumbraltrx"),
col("a.hortrx"),
col("a.mtocambiodolarros"),
col("a.mtotrx"),
col("a.mtotrxdol"),
col("a.mtodestino"),
col("a.numtrxapp"),
col("a.tipopereportesbs"),
col("a.codmonedadestino"),
col("a.codcargoclibeneficiario"),
col("a.codcargocliordenante"),
col("a.codcargoclisolicitante"),
col("a.codclavepartycliperbeneficiario"),
col("a.codclavepartycliperordenante"),
col("a.codclavepartyclipersolicitante"),
col("a.coddistritobeneficiario"),
col("a.coddistritoordenante"),
col("a.coddistritosolicitante"),
when(
(trim(col("a.codappros")) == "REMI") & (substring(trim(col("a.codpaisdestinosbs")), 1, 2).isNotNull()),
substring(trim(col("a.codpaisdestinosbs")), 1, 2)
).when(
col("a.codpaisdestinosbs").isNotNull(),
col("a.codpaisdestinosbs")
).when(
(trim(col("a.codappros")).isin(["GGTT", "REMI", "NTLC"])) & (
col("a.flgtrxinternacional") == self.S_CONS),
substring(trim(col("a.codswiftbcoexteriordestino")), 5, 2)
).when(
trim(col("a.codappros")) == "GGTT",
substring(trim(coalesce(col("a.codswiftbcoexteriordestino"), lit(" "))), 5, 2)
).when(
(trim(col("a.codappros")) == "BSIO") & (
col("a.tipopereportesbs").isin(["27", "28", "29", "30", "31", "32"])),
substring(trim(coalesce(col("a.codswiftbcoexteriordestino"), lit(" "))), 5, 2)
).when(
trim(col("a.codappros")) == "BKTD",
substring(trim(coalesce(col("a.codswiftbcoexteriordestino"), lit(" "))), 5, 2)
).otherwise(lit(" ")).alias("codpaisdestinosbs"),
when(
(trim(col("a.codappros")).isin("REMI", "NTLC")) & (col("a.flgtrxinternacional") == self.S_CONS),
substring(trim(col("a.codswiftbcoexterioremisor")), 5, 2)
).when(
trim(col("a.codappros")) == "GGTT",
substring(trim(coalesce(col("a.codswiftbcoexterioremisor"), lit(" "))), 5, 2)
).when(
(trim(col("a.codappros")) == "BSIO") & (
col("a.tipopereportesbs").isin(["27", "28", "29", "30", "31", "32"])),
substring(trim(coalesce(col("a.codswiftbcoexterioremisor"), lit(" "))), 5, 2)
).when(
trim(col("a.codappros")) == "BKTD",
lit("PE")
).when(
col("a.codpaisemisorsbs").isNotNull(),
col("a.codpaisemisorsbs")
).otherwise(lit(" ")).alias("codpaisemisorsbs"),
col("a.codswiftbcoexterioremisor"),
col("a.codswiftbcoexteriordestino"),
col("a.numcheque"),
col("a.desorigendinero"),
col("a.nbrocupacionclibeneficiario"),
col("a.nbrocupacioncliordenante"),
col("a.nbrocupacionclisolicitante"),
col("a.flgumbralbeneficiario"),
col("a.flgumbralordenante"),
col("a.flgumbralsolicitante"),
col("a.codclavepartyage"),
col("a.tipclibeneficiario"),
col("a.tipcliordenante"),
col("a.tipclisolicitante"),
coalesce(col("a1.tippartyidentificacionsbs"), lit('9')).alias("tippartyidentificacionbeneficiario"),
coalesce(col("a2.tippartyidentificacionsbs"), lit('9')).alias("tippartyidentificacionordenante"),
coalesce(col("a3.tippartyidentificacionsbs"), lit('9')).alias("tippartyidentificacionsolicitante"),
col("a.tipindicadorlavadoactivo"),
col("a.codpais_beneficiario"),
col("a.codpais_ordenante"),
col("a.codpais_solicitante"),
col("a.mtotrx_sbs"),
col("a.codmoneda_sbs"),
col("a.mtotrx_sbs_dol"),
col("a.mtocambioalnuevosol"),
coalesce(col("a.codprofesion_beneficiario"), lit('999')).alias("codprofesion_beneficiario"),
coalesce(col("a.codprofesion_ordenante"), lit('999')).alias("codprofesion_ordenante"),
coalesce(col("a.codprofesion_solicitante"), lit('999')).alias("codprofesion_solicitante"),
col("a.codsucage"),
col("a.flgtrxefectivo"),
col("a.flgresidentesolicitante"),
col("a.flgresidenteordenante"),
col("a.flgresidentebeneficiario"),
col("a.tiprelpersonabcosolicitante"),
col("a.tiprelpersonabcoordenante"),
col("a.tiprelpersonabcobeneficiario"),
substring(trim(col("a.desorigendinero")), 1, 79).alias("desorigenfondos"),
col("a.m_tipmancomuno"),
col("a.m_origenro"),
col("a.m_codclavecta"),
col("a.m_numcodclavepartycli_o"),
col("a.m_numcodclavepartycli_b"),
col("a.m_codclavepartycli_ori"),
col("a.m_codclaveunicocli_ori"),
col("a.codclavepartyclisolicitante").alias("key_codclavepartyclisolicitante"),
col("a.codclavepartycliordenante").alias("key_codclavepartycliordenante"),
col("a.codclavepartyclibeneficiario").alias("key_codclavepartyclibeneficiario"),
col("a.codclaveunicoclisolicitante").alias("key_codclaveunicoclisolicitante"),
col("a.codclaveunicocliordenante").alias("key_codclaveunicocliordenante"),
col("a.codclaveunicoclibeneficiario").alias("key_codclaveunicoclibeneficiario"),
col("a.tipctaordinario"),
col("a.tipcargoabono"),
col("a.tipctabeneficiario"),
col("a.codbco_ord"),
col("a.codbco_ben"),
col("b.codprovincia").alias("codprovinciasolicitante"),
col("b.coddepartamento").alias("coddepartamentosolicitante"),
col("c.codprovincia").alias("codprovinciaordenante"),
col("c.coddepartamento").alias("coddepartamentoordenante"),
col("d.codprovincia").alias("codprovinciabeneficiario"),
col("d.coddepartamento").alias("coddepartamentobeneficiario"),
when(trim(col("a.codappros")).isin(["PIA", "BSIO"]), lit("1"))
.otherwise(lit("2")).alias("codcanal_mod"),
when(trim(col("a.codappros")).isin(["PIA", "BSIO"]), lit(" "))
.when(col("a.codcanal").isin(["1000", "1019", "1020"]), lit("OTROS"))
.when(trim(col("a.codappros")) == "IOAP", lit("IO"))
.otherwise(coalesce(col("f1.descanal"), col("f.descanal"), lit("OTROS"))).alias("descanal_mod"),
lit('').alias("nroparche"),
when((trim(col("a.tipopereportesbs")) == "26") & (trim(col("a.codappros")) == "TTIB"), col("g.codbcosbs"))
.when(trim(col("a.tipopereportesbs")) == "9", col("g.codbcosbs"))
.otherwise(coalesce(col("g.codbcosbs"), lit(""))).alias("codcuentas_orden_1"),
when((trim(col("a.tipopereportesbs")) == "26") & (trim(col("a.codappros")) == "TTIB"), lit("1"))
.when((trim(col("a.tipopereportesbs")) == "99") & (trim(col("a.codappros")) == "IOAP"), lit("4"))
.when((trim(col("a.tipopereportesbs")) == "17") & (col("a.flgtrxefectivo") == "1"), lit(None))
.when((trim(col("a.tipopereportesbs")) == "24") & (trim(col("a.codappros")) == "APOB"), lit("3"))
.when((trim(col("a.tipopereportesbs")) == "17") & (col("a.flgtrxefectivo") == "2"), lit("1"))
.otherwise(
when(length(trim(col("a.codclavectaordenante"))) > 2, coalesce(col("a.tipctaordinario"), lit("")))
.otherwise("4")
).alias("codcuentas_orden_2"),
when(
(trim(col("a.tipopereportesbs")).isin(["29", "30", "31", "32"])) &
(trim(col("a.codappros")).isin(["GGTT", "REMI", "BKTD"])),
coalesce(col("i.nbrbcoexterior"), lit(""))
).otherwise(coalesce(col("i.nbrbcoexterior"), lit(""))).alias("codcuentas_orden_4"),
coalesce(col("h.codbcosbs"), lit("")).alias("codcuentas_benef_1"),
when(trim(col("a.tipopereportesbs")) == "9", lit("2"))
.when(trim(col("a.tipopereportesbs")) == "2", lit("1"))
.when((trim(col("a.tipopereportesbs")) == "17") & (col("a.flgtrxefectivo") == "1"), lit("2"))
.when((trim(col("a.tipopereportesbs")) == "17") & (col("a.flgtrxefectivo") == "2"), lit("2"))
.when(trim(col("a.tipopereportesbs")) == "7", lit("2"))
.when((trim(col("a.tipopereportesbs")) == "12") & (col("a.codappros").isin(["BSIO", "CSCO"])), lit("1"))
.when((trim(col("a.tipopereportesbs")) == "24") & (trim(col("a.codappros")) == "APOB"), lit("3"))
.when(length(trim(col("a.codclavectabeneficiario"))) > 2, coalesce(col("a.tipctabeneficiario"), lit("")))
.otherwise("4").alias("codcuentas_benef_2"),
coalesce(col("j.nbrbcoexterior"), lit("")).alias("codcuentas_benef_4")
))
return df_tp_ro_1_gora
![]() |
Notes is a web-based application for online taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000+ notes created and continuing...
With notes.io;
- * You can take a note from anywhere and any device with internet connection.
- * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
- * You can quickly share your contents without website, blog and e-mail.
- * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
- * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.
Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.
Easy: Notes.io doesn’t require installation. Just write and share note!
Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )
Free: Notes.io works for 14 years and has been free since the day it was started.
You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;
Email: [email protected]
Twitter: http://twitter.com/notesio
Instagram: http://instagram.com/notes.io
Facebook: http://facebook.com/notesio
Regards;
Notes.io Team
