Skip to content

Commit

Permalink
Para poder ler arquivo dividido em varios zips
Browse files Browse the repository at this point in the history
Adaptação para suportar nova forma de disponibilização dos dados pela RF, em multiplos arquivos zip.
Colocar todos os arquivos em uma pasta e usar com opção --dir
  • Loading branch information
fabioserpa authored Jun 11, 2019
1 parent 660284e commit 9b2fceb
Showing 1 changed file with 158 additions and 126 deletions.
284 changes: 158 additions & 126 deletions cnpj.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- encoding: utf-8 -*-
import os
import glob
import sys
import csv
import datetime
Expand Down Expand Up @@ -175,135 +176,146 @@

NOME_ARQUIVO_SQLITE = 'CNPJ_full.db'

def cnpj_full(input_path, tipo_output, output_path):
def cnpj_full(input_list, tipo_output, output_path):
total_empresas = 0
controle_empresas = 0
total_socios = 0
controle_socios = 0
total_cnaes = 0
controle_cnaes = 0

if not os.path.exists(output_path):
os.makedirs(output_path)

if tipo_output == 'sqlite':
import sqlite3
conBD = sqlite3.connect(os.path.join(output_path,NOME_ARQUIVO_SQLITE))

dados = read_cfwf(input_path,
type_width=1,
colspecs= {'0':HEADER_COLSPECS,
'1':EMPRESAS_COLSPECS,
'2':SOCIOS_COLSPECS,
'6':CNAES_COLSPECS,
'9':TRAILLER_COLSPECS},
names={'0':HEADER_COLUNAS,
'1':EMPRESAS_COLUNAS,
'2':SOCIOS_COLUNAS,
'6':CNAES_COLUNAS,
'9':TRAILLER_COLUNAS},
dtype={'1': EMPRESAS_DTYPE,
'2': SOCIOS_DTYPE},
chunksize=CHUNKSIZE)

for i, dado in enumerate(dados):
print('Processando bloco {}: até linha {}.'.format(i+1,(i+1)*CHUNKSIZE), end='\r')

for tipo_registro, df in dado.items():

if tipo_registro == '1': # empresas
total_empresas += len(df)

# Troca datas zeradas por vazio
df['data_opc_simples'] = (df['data_opc_simples']
.where(df['data_opc_simples'] != '00000000',''))
df['data_exc_simples'] = (df['data_exc_simples']
.where(df['data_exc_simples'] != '00000000',''))
df['data_sit_especial'] = (df['data_sit_especial']
.where(df['data_sit_especial'] != '00000000',''))

elif tipo_registro == '2': # socios
total_socios += len(df)

# Troca cpf invalido por vazio
df['cpf_repres'] = (df['cpf_repres']
.where(df['cpf_repres'] != '***000000**',''))
df['nome_repres'] = (df['nome_repres']
.where(df['nome_repres'] != 'CPF INVALIDO',''))

# Se socio for tipo 1 (cnpj), deixa campo intacto, do contrario,
# fica apenas com os ultimos 11 digitos
df['cnpj_cpf_socio'] = (df['cnpj_cpf_socio']
.where(df['tipo_socio'] == '1',
df['cnpj_cpf_socio'].str[-11:]))

elif tipo_registro == '6': # cnaes_secundarios
total_cnaes += len(df)

# Verticaliza tabela de associacao de cnaes secundarios,
# mantendo apenas os validos (diferentes de 0000000)
df = pd.melt(df,
id_vars=[CNAES_COLUNAS[0]],
value_vars=range(99),
var_name='cnae_ordem',
value_name='cnae')

df = df[df['cnae'] != '0000000']

elif tipo_registro == '0': # header
print('\nINFORMACOES DO HEADER:')

header = df.iloc[0,:]

for k, v in header.items():
print('{}: {}'.format(k, v))

# Para evitar que tente armazenar dados de header
continue

elif tipo_registro == '9': # trailler
print('\nINFORMACOES DE CONTROLE:')

trailler = df.iloc[0,:]

controle_empresas = int(trailler['Total de registros de empresas'])
controle_socios = int(trailler['Total de registros de socios'])
controle_cnaes = int(trailler['Total de registros de CNAEs secundarios'])

print('Total de registros de empresas: {}'.format(controle_empresas))
print('Total de registros de socios: {}'.format(controle_socios))
print('Total de registros de CNAEs secundarios: {}'.format(controle_cnaes))
print('Total de registros incluindo header e trailler: {}'.format(
int(trailler['Total de registros incluindo header e trailler'])))

# Para evitar que tente armazenar dados de trailler
continue

if tipo_output == 'csv':
if i > 0:
replace_append = 'a'
header=False
else:
replace_append = 'w'
header=True

nome_arquivo_csv = REGISTROS_TIPOS[tipo_registro] + '.csv'
df.to_csv(os.path.join(output_path,nome_arquivo_csv),
header=header,
mode=replace_append,
index=False,
quoting=csv.QUOTE_NONNUMERIC)

elif tipo_output == 'sqlite':
replace_append = 'append' if i > 0 else 'replace'

df.to_sql(REGISTROS_TIPOS[tipo_registro],
con=conBD,
if_exists=replace_append,
index=False)
# Itera sobre sequencia de arquivos (p/ suportar arquivo dividido pela RF)
for i_arq, arquivo in enumerate(input_list):
print('Processando arquivo: {}'.format(arquivo))

dados = read_cfwf(arquivo,
type_width=1,
colspecs= {'0':HEADER_COLSPECS,
'1':EMPRESAS_COLSPECS,
'2':SOCIOS_COLSPECS,
'6':CNAES_COLSPECS,
'9':TRAILLER_COLSPECS},
names={'0':HEADER_COLUNAS,
'1':EMPRESAS_COLUNAS,
'2':SOCIOS_COLUNAS,
'6':CNAES_COLUNAS,
'9':TRAILLER_COLUNAS},
dtype={'1': EMPRESAS_DTYPE,
'2': SOCIOS_DTYPE},
chunksize=CHUNKSIZE)

# Itera sobre blocos (chunks) do arquivo
for i_bloco, bloco in enumerate(dados):
print('Processando bloco {}: até linha {}.'.format(i_bloco+1,
(i_bloco+1)*CHUNKSIZE),
end='\r')

for tipo_registro, df in bloco.items():

if tipo_registro == '1': # empresas
total_empresas += len(df)

# Troca datas zeradas por vazio
df['data_opc_simples'] = (df['data_opc_simples']
.where(df['data_opc_simples'] != '00000000',''))
df['data_exc_simples'] = (df['data_exc_simples']
.where(df['data_exc_simples'] != '00000000',''))
df['data_sit_especial'] = (df['data_sit_especial']
.where(df['data_sit_especial'] != '00000000',''))

elif tipo_registro == '2': # socios
total_socios += len(df)

# Troca cpf invalido por vazio
df['cpf_repres'] = (df['cpf_repres']
.where(df['cpf_repres'] != '***000000**',''))
df['nome_repres'] = (df['nome_repres']
.where(df['nome_repres'] != 'CPF INVALIDO',''))

# Se socio for tipo 1 (cnpj), deixa campo intacto, do contrario,
# fica apenas com os ultimos 11 digitos
df['cnpj_cpf_socio'] = (df['cnpj_cpf_socio']
.where(df['tipo_socio'] == '1',
df['cnpj_cpf_socio'].str[-11:]))

elif tipo_registro == '6': # cnaes_secundarios
total_cnaes += len(df)

# Verticaliza tabela de associacao de cnaes secundarios,
# mantendo apenas os validos (diferentes de 0000000)
df = pd.melt(df,
id_vars=[CNAES_COLUNAS[0]],
value_vars=range(99),
var_name='cnae_ordem',
value_name='cnae')

df = df[df['cnae'] != '0000000']

elif tipo_registro == '0': # header
print('\nINFORMACOES DO HEADER:')

header = df.iloc[0,:]

for k, v in header.items():
print('{}: {}'.format(k, v))

# Para evitar que tente armazenar dados de header
continue

elif tipo_registro == '9': # trailler
print('\nINFORMACOES DE CONTROLE:')

trailler = df.iloc[0,:]

controle_empresas = int(trailler['Total de registros de empresas'])
controle_socios = int(trailler['Total de registros de socios'])
controle_cnaes = int(trailler['Total de registros de CNAEs secundarios'])

print('Total de registros de empresas: {}'.format(controle_empresas))
print('Total de registros de socios: {}'.format(controle_socios))
print('Total de registros de CNAEs secundarios: {}'.format(controle_cnaes))
print('Total de registros incluindo header e trailler: {}'.format(
int(trailler['Total de registros incluindo header e trailler'])))

# Para evitar que tente armazenar dados de trailler
continue

if tipo_output == 'csv':
if (i_arq + i_bloco) > 0:
replace_append = 'a'
header=False
else:
replace_append = 'w'
header=True

nome_arquivo_csv = REGISTROS_TIPOS[tipo_registro] + '.csv'
df.to_csv(os.path.join(output_path,nome_arquivo_csv),
header=header,
mode=replace_append,
index=False,
quoting=csv.QUOTE_NONNUMERIC)

elif tipo_output == 'sqlite':
replace_append = 'append' if (i_arq + i_bloco) > 0 else 'replace'

df.to_sql(REGISTROS_TIPOS[tipo_registro],
con=conBD,
if_exists=replace_append,
index=False)


if tipo_output == 'sqlite':
conBD.close()

# Imprime totais
print('\nConversao concluída. Validando quantidades:')
print('\nConversao concluida. Validando quantidades:')

inconsistente = False

Expand All @@ -330,7 +342,7 @@ def cnpj_full(input_path, tipo_output, output_path):


if inconsistente:
print(u'Atenção! Foi detectada inconsistência entre as quantidades lidas e as informações de controle do arquivo.')
print(u'Atencao! Foi detectada inconsistencia entre as quantidades lidas e as informacoes de controle do arquivo.')

if tipo_output == 'csv':
print(u'Arquivos CSV gerados na pasta {}.'.format(output_path))
Expand Down Expand Up @@ -369,13 +381,20 @@ def cnpj_index(output_path):

def help():
print('''
Uso: python cnpj.py <arquivo_input> <output:csv|sqlite> <path_output> [--index]
Exemplo: python cnpj.py "data/F.K032001K.D81106D" sqlite "data" --index
Uso: python cnpj.py <path_input> <output:csv|sqlite> <path_output> [--dir] [--noindex]
Argumentos opcionais:
[--dir]: Indica que o <path_input> e uma pasta e pode conter varios ZIPs.
[--noindex]: NAO gera indices automaticamente no sqlite ao final da carga.
Exemplos: python cnpj.py "data/F.K032001K.D81106D" sqlite "output"
python cnpj.py "data" sqlite "output" --dir
python cnpj.py "data" sqlite "output" --dir --noindex
python cnpj.py "data" csv "output" --dir
''')


def main():
# python cnpj.py <arquivo_input> <output:csv|sqlite> <arquivo_output> [--index]

num_argv = len(sys.argv)
if num_argv < 4:
help()
Expand All @@ -385,24 +404,37 @@ def main():
tipo_output = sys.argv[2]
output_path = sys.argv[3]

gera_index = True
input_list = [input_path]

if num_argv > 4:
for opcional in sys.argv[4:num_argv]:
if (opcional == '--noindex'):
gera_index = False
elif (opcional == '--dir'):
input_list = glob.glob(os.path.join(input_path,'*.zip'))
input_list.sort()
else:
print(u'Argumento opcional inválido.')
help()
break

if tipo_output not in ['csv','sqlite']:
print('''
Erro: tipo de output inválido.
Escolha um dos seguintes tipos de output: csv ou sqlite.
''')

help()

else:
cnpj_full(input_path, tipo_output, output_path)
print('Iniciando processamento em {}'.format(datetime.datetime.now()))

cnpj_full(input_list, tipo_output, output_path)

# Possui argumento opcional
if num_argv > 4:
for opcional in sys.argv[4:num_argv]:
if (opcional == '--index') and (tipo_output == 'sqlite'):
cnpj_index(output_path)
if (gera_index) and (tipo_output == 'sqlite'):
cnpj_index(output_path)

print('Processamento concluido em {}'.format(datetime.datetime.now()))

if __name__ == "__main__":
print('Iniciando processamento em {}'.format(datetime.datetime.now()))
main()
print('Processamento concluido em {}'.format(datetime.datetime.now()))

0 comments on commit 9b2fceb

Please sign in to comment.