-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
308 lines (241 loc) · 12 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
import datetime
import json
import os
from json import JSONDecodeError
from os import path
from sys import argv
from time import time
from typing import List
import statistics
from custom_types import NamedConversation
from utils import separator
class FacebookStatistics:
"""
Main entry-point.
"""
def __init__(self, root_path: str, encoding: str = 'utf-8', exclude_group_chats=True, exhaustive_lists=False,
ignore_facebook_user=True):
self.root_path = root_path
self.encoding = encoding
# Settings.
self.exclude_group_chats = exclude_group_chats
self.exhaustive_lists = exhaustive_lists
self.ignore_facebook_user = ignore_facebook_user
self.print_settings()
# Data
self.my_name: str = None
self.conversations: List[NamedConversation] = []
self.parse_my_name()
def print_settings(self) -> None:
"""
Prints current settings to standard output.
"""
print('Setting: Exclude Group Chats: ', self.exclude_group_chats)
print('Setting: Ignore Facebook User: ', self.ignore_facebook_user)
print('Setting: Exhaustive lists: ', self.exhaustive_lists)
def parse_my_name(self) -> None:
"""
Parses name of person whose archive is being processed.
Name is than stored as field `my_name`.
"""
if path.isdir(path.join(self.root_path, 'profile_information')):
print('Parsing profile...')
with open(path.join(self.root_path, 'profile_information', 'profile_information.json'),
encoding='raw_unicode_escape') as f:
# Facebook export tool produces invalid JSONs. Here we try to fix
# wrongly encoded characters.
encoded = f.read().encode('raw_unicode_escape')
decoded = encoded.decode()
# Also some of the control characters are not encoded correctly, so
# we remove all of them - we are safe to remove line break as JSON is
# valid without any whitespace.
for i in range(32):
decoded = decoded.replace(chr(i), '')
try:
doc = json.loads(decoded)
except JSONDecodeError as e:
print(">>>>> JSON DECODE ERROR in profile_information")
print(e)
exit(1)
return
self.my_name = doc['profile']['name']['full_name']
else:
separator()
print('Profile Information section is not included in this export!')
print('Please provide your name (exactly as on Facebook) so we can ' +
'differentiate your messages from messages of your friends.')
self.my_name = input('Your name (exactly as on Facebook): ').strip()
separator()
print(f'Person name: {self.my_name}')
def parse_all_messages(self) -> None:
"""
Lists all threads in messages folder and parses each folder as one thread.
"""
subfolders = os.listdir(path.join(self.root_path, 'messages'))
i = 0
time_start = time()
for subfolder in subfolders:
# facebook started putting used stickers used in conversations into
# folder called 'stickers_used' which is placed alongside the threads.
if subfolder == 'stickers_used':
continue
folders = os.listdir(path.join(self.root_path,
'messages',
subfolder))
conversation_count = len(folders)
print(f'Found {conversation_count} threads in {subfolder}')
for file in folders:
# Verify if the message file exists.
if not os.path.exists(path.join(self.root_path, 'messages', subfolder, file, 'message_1.json')):
print(f'Warning: No message.json file for thread {file}! Skipping.')
continue
print(f'({i}/{conversation_count}) Parsing thread {file}...')
named_conversation = self.parse_conversation(path.join(subfolder, file))
i += 1
if named_conversation is None:
continue
# Exclude conversation with self and group conversations if setting is enabled
if len(named_conversation[1]) > 1:
if not (self.exclude_group_chats and len(named_conversation[1]) > 2):
self.conversations.append(named_conversation)
print(f'Parsed {i - 1} conversations in {time() - time_start} seconds.')
def parse_conversation(self, thread_dir: str) -> NamedConversation:
"""
Parses conversation from JSON file specified by thread_dir parameter and returns
its participants, title and messages.
:param thread_dir: directory to parse conversation from (AdamSulko_3c954401d0 for example)
:return: parsed conversation
"""
# absolute path to the thread directory
thread_path = path.join(self.root_path, 'messages', thread_dir)
# listing all the files in thread directory
files_in_dir = os.listdir(thread_path)
parsed_files: List[NamedConversation] = []
# iterating through the listed files
for name in files_in_dir:
# file's absolute path
name_path = path.join(thread_path, name)
if not os.path.isfile(name_path):
continue
# parsing the file and appending it to parsed_files if not None
c = self.parse_file(name_path)
if c != None:
parsed_files.append(c)
# if none of the files have been successfully parsed return None
if len(parsed_files) == 0:
return None
# appending messages from all parsed files to the first one
conversation = parsed_files[0]
for c in parsed_files[1:]:
conversation[2].extend(c[2])
return conversation
def parse_file(self, path: str) -> NamedConversation:
with open(path, encoding='raw_unicode_escape') as f:
# Facebook export tool produces invalid JSONs. Here we try to fix
# wrongly encoded characters.
encoded = f.read().encode('raw_unicode_escape')
decoded = encoded.decode()
# Also some of the control characters are not encoded correctly, so
# we remove all of them - we are safe to remove line break as JSON is
# valid without any whitespace.
for i in range(32):
decoded = decoded.replace(chr(i), '')
try:
doc = json.loads(decoded)
except JSONDecodeError as e:
print(">>>>> JSON DECODE ERROR")
print(e)
with open('error_file.json', mode='w', encoding='utf-8') as g:
g.write(decoded)
return None
messages = []
participants = list(map(lambda x: x['name'], doc.get('participants', set())))
# Convert JSON message objects to correct structure.
for msg in reversed(doc['messages']):
sender_name = msg.get('sender_name', '')
# Build participants array if the JSON document does not contain this information.
if isinstance(participants, set):
participants.add(sender_name)
if sender_name != '' or not self.ignore_facebook_user:
messages.append((
sender_name,
msg.get('content', ''),
datetime.datetime.fromtimestamp(msg.get('timestamp_ms') / 1000)
))
return doc.get('title', ' '.join(participants)), list(participants), messages
def all_stats(self, conversations: List[NamedConversation]):
"""
Runs all statistics for specified list of conversations.
:param conversations: list of conversation to run statistics generators on
:return:
"""
for func in [self.global_stats, self.top_conversations_by_chars, self.top_conversations_by_messages,
self.conversation_people_variability, self.hourly_histogram, self.years_histogram,
self.day_in_week_histogram, self.msg_lenghts, self.msgs_before_reply, self.time_before_reply,
self.who_started_conv, self.most_used_words]:
separator()
func(conversations)
def all_global_stats(self):
self.all_stats(self.conversations)
# =============================================================
# Shortcut methods for generating different statistics for this
# archive and user.
# =============================================================
def global_stats(self, conversations: List[NamedConversation]):
statistics.general_stats(self.my_name, conversations)
def hourly_histogram(self, conversations: List[NamedConversation]):
statistics.hourly_histogram(conversations)
def years_histogram(self, conversations: List[NamedConversation]):
statistics.yearly_histogram(conversations)
def day_in_week_histogram(self, conversations: List[NamedConversation]):
statistics.day_in_week_histogram(conversations)
def msg_lenghts(self, conversations: List[NamedConversation]):
statistics.messages_lengths(self.my_name, conversations)
def top_conversations_by_chars(self, conversations: List[NamedConversation]):
statistics.top_conversations_by_chars(self.my_name, conversations, self.exhaustive_lists)
def top_conversations_by_messages(self, conversations: List[NamedConversation]):
statistics.top_conversations_by_messages(self.my_name, conversations, self.exhaustive_lists)
def conversation_people_variability(self, conversations: List[NamedConversation]):
statistics.conversation_people_variability(self.my_name, conversations)
def msgs_before_reply(self, conversations: List[NamedConversation]):
statistics.msgs_before_reply(self.my_name, conversations)
def time_before_reply(self, conversations: List[NamedConversation]):
statistics.time_before_reply(self.my_name, conversations)
def most_used_words(self, conversations: List[NamedConversation]):
statistics.most_used_words(self.my_name, conversations, self.exhaustive_lists)
def who_started_conv(self, conversations: List[NamedConversation]):
statistics.who_started_conv(self.my_name, conversations)
if __name__ == '__main__':
print('You invoked script as interactive shell.')
separator()
print('Please enter path to unzipped Facebook export ' +
'directory (the one which contains sub-folder `messages`).')
# Check if user provided argument and wants to use it as root folder for
# generating statistics from.
if len(argv) > 1 and len(argv[1]) > 0:
p = argv[1]
print('Using provided argument as path: ', argv[1])
else:
p = input('Export root: ')
# Verify that provided path is valid Facebook export archive by checking
# the presence of most important folders and files.
if not path.isdir(path.join(p, 'messages')):
separator()
print('Error: Provided path does not contain required sub-folders html and messages!')
exit(1)
# Everything seems to be alright so let's start parsing everything.
separator()
stats = FacebookStatistics(p)
stats.parse_all_messages()
# Generate global statistics.
stats.all_global_stats()
print('\n\n')
print('Printing statistics for each conversation. Conversations with less than 100 messages will be skipped.')
print('\n\n')
for conversation in stats.conversations:
if len(conversation[2]) >= 100:
print('\n\n')
print('+============================================================+')
print(f'|{conversation[0]:^60}|')
print('|============================================================|')
stats.all_stats([conversation])