forked from databricks/Spark-The-Definitive-Guide
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAdvanced_Analytics_and_Machine_Learning-Chapter_30_Graph_Analysis.py
126 lines (72 loc) · 3.12 KB
/
Advanced_Analytics_and_Machine_Learning-Chapter_30_Graph_Analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
bikeStations = spark.read.option("header","true")\
.csv("/data/bike-data/201508_station_data.csv")
tripData = spark.read.option("header","true")\
.csv("/data/bike-data/201508_trip_data.csv")
# COMMAND ----------
stationVertices = bikeStations.withColumnRenamed("name", "id").distinct()
tripEdges = tripData\
.withColumnRenamed("Start Station", "src")\
.withColumnRenamed("End Station", "dst")
# COMMAND ----------
from graphframes import GraphFrame
stationGraph = GraphFrame(stationVertices, tripEdges)
stationGraph.cache()
# COMMAND ----------
print "Total Number of Stations: " + str(stationGraph.vertices.count())
print "Total Number of Trips in Graph: " + str(stationGraph.edges.count())
print "Total Number of Trips in Original Data: " + str(tripData.count())
# COMMAND ----------
from pyspark.sql.functions import desc
stationGraph.edges.groupBy("src", "dst").count().orderBy(desc("count")).show(10)
# COMMAND ----------
stationGraph.edges\
.where("src = 'Townsend at 7th' OR dst = 'Townsend at 7th'")\
.groupBy("src", "dst").count()\
.orderBy(desc("count"))\
.show(10)
# COMMAND ----------
townAnd7thEdges = stationGraph.edges\
.where("src = 'Townsend at 7th' OR dst = 'Townsend at 7th'")
subgraph = GraphFrame(stationGraph.vertices, townAnd7thEdges)
# COMMAND ----------
motifs = stationGraph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[ca]->(a)")
# COMMAND ----------
from pyspark.sql.functions import expr
motifs.selectExpr("*",
"to_timestamp(ab.`Start Date`, 'MM/dd/yyyy HH:mm') as abStart",
"to_timestamp(bc.`Start Date`, 'MM/dd/yyyy HH:mm') as bcStart",
"to_timestamp(ca.`Start Date`, 'MM/dd/yyyy HH:mm') as caStart")\
.where("ca.`Bike #` = bc.`Bike #`").where("ab.`Bike #` = bc.`Bike #`")\
.where("a.id != b.id").where("b.id != c.id")\
.where("abStart < bcStart").where("bcStart < caStart")\
.orderBy(expr("cast(caStart as long) - cast(abStart as long)"))\
.selectExpr("a.id", "b.id", "c.id", "ab.`Start Date`", "ca.`End Date`")\
.limit(1).show(1, False)
# COMMAND ----------
from pyspark.sql.functions import desc
ranks = stationGraph.pageRank(resetProbability=0.15, maxIter=10)
ranks.vertices.orderBy(desc("pagerank")).select("id", "pagerank").show(10)
# COMMAND ----------
inDeg = stationGraph.inDegrees
inDeg.orderBy(desc("inDegree")).show(5, False)
# COMMAND ----------
outDeg = stationGraph.outDegrees
outDeg.orderBy(desc("outDegree")).show(5, False)
# COMMAND ----------
degreeRatio = inDeg.join(outDeg, "id")\
.selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio")
degreeRatio.orderBy(desc("degreeRatio")).show(10, False)
degreeRatio.orderBy("degreeRatio").show(10, False)
# COMMAND ----------
stationGraph.bfs(fromExpr="id = 'Townsend at 7th'",
toExpr="id = 'Spear at Folsom'", maxPathLength=2).show(10)
# COMMAND ----------
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
# COMMAND ----------
minGraph = GraphFrame(stationVertices, tripEdges.sample(False, 0.1))
cc = minGraph.connectedComponents()
# COMMAND ----------
cc.where("component != 0").show()
# COMMAND ----------
scc = minGraph.stronglyConnectedComponents(maxIter=3)
# COMMAND ----------