Skip to content

Commit

Permalink
Additional Updates to part 3 of Course
Browse files Browse the repository at this point in the history
  • Loading branch information
matthew-mcateer committed Mar 9, 2018
1 parent 01cdf41 commit 487d664
Show file tree
Hide file tree
Showing 70 changed files with 703 additions and 621 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,44 +113,74 @@
},
{
"cell_type": "code",
"execution_count": 11,
"execution_count": 6,
"metadata": {},
"outputs": [
{
"ename": "SyntaxError",
"evalue": "invalid syntax (<ipython-input-11-86135663c1da>, line 2)",
"output_type": "error",
"traceback": [
"\u001b[0;36m File \u001b[0;32m\"<ipython-input-11-86135663c1da>\"\u001b[0;36m, line \u001b[0;32m2\u001b[0m\n\u001b[0;31m dst = ssc.queueStream(transaction_rdd_queue) .transform(lambda rdd: rdd.join(customer_rdd)) .filter(lambda (customer_id, (customer_data, is_good_customer)): is_good_customer)\u001b[0m\n\u001b[0m ^\u001b[0m\n\u001b[0;31mSyntaxError\u001b[0m\u001b[0;31m:\u001b[0m invalid syntax\n"
]
}
],
"outputs": [],
"source": [
"# Join the streaming RDD and batch RDDs to filter out bad customers.\n",
"dst = ssc.queueStream(transaction_rdd_queue)\\\n",
" .transform(lambda rdd: rdd.join(customer_rdd))\\\n",
" .filter(lambda (customer_id, (customer_data, is_good_customer)): is_good_customer)\n",
"dst = ssc.queueStream(transaction_rdd_queue).transform(lambda rdd: rdd.join(customer_rdd)).filter(lambda rdd: rdd[1][1] == True)\n",
"## END OF EXERCISE SECTION ==================================\n",
"dst.pprint()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": 7,
"metadata": {},
"outputs": [
{
"ename": "Py4JJavaError",
"evalue": "An error occurred while calling o20.start.\n: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute\n\tat scala.Predef$.require(Predef.scala:224)\n\tat org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)\n\tat org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)\n\tat org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)\n\tat org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:748)\n",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-8-90c9a179f130>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mssc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstart\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 2\u001b[0m \u001b[0mtime\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msleep\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m6\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[0mssc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark-2.1.0-bin-hadoop2.7/python/pyspark/streaming/context.py\u001b[0m in \u001b[0;36mstart\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 194\u001b[0m \u001b[0mStart\u001b[0m \u001b[0mthe\u001b[0m \u001b[0mexecution\u001b[0m \u001b[0mof\u001b[0m \u001b[0mthe\u001b[0m \u001b[0mstreams\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 195\u001b[0m \"\"\"\n\u001b[0;32m--> 196\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jssc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstart\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 197\u001b[0m \u001b[0mStreamingContext\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_activeContext\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 198\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1131\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1132\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1133\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1134\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1135\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 317\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 318\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 319\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 320\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 321\u001b[0m raise Py4JError(\n",
"\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o20.start.\n: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute\n\tat scala.Predef$.require(Predef.scala:224)\n\tat org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)\n\tat org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)\n\tat org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)\n\tat org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:748)\n"
"name": "stdout",
"output_type": "stream",
"text": [
"-------------------------------------------\n",
"Time: 2018-03-03 08:00:19\n",
"-------------------------------------------\n",
"(0, (None, True))\n",
"(8, (None, True))\n",
"(4, (None, True))\n",
"(2, (None, True))\n",
"(6, (None, True))\n",
"\n",
"-------------------------------------------\n",
"Time: 2018-03-03 08:00:20\n",
"-------------------------------------------\n",
"(0, (None, True))\n",
"(8, (None, True))\n",
"(4, (None, True))\n",
"(2, (None, True))\n",
"(6, (None, True))\n",
"\n",
"-------------------------------------------\n",
"Time: 2018-03-03 08:00:21\n",
"-------------------------------------------\n",
"(0, (None, True))\n",
"(8, (None, True))\n",
"(4, (None, True))\n",
"(2, (None, True))\n",
"(6, (None, True))\n",
"\n",
"-------------------------------------------\n",
"Time: 2018-03-03 08:00:22\n",
"-------------------------------------------\n",
"(0, (None, True))\n",
"(8, (None, True))\n",
"(4, (None, True))\n",
"(2, (None, True))\n",
"(6, (None, True))\n",
"\n",
"-------------------------------------------\n",
"Time: 2018-03-03 08:00:23\n",
"-------------------------------------------\n",
"(0, (None, True))\n",
"(8, (None, True))\n",
"(4, (None, True))\n",
"(2, (None, True))\n",
"(6, (None, True))\n",
"\n",
"-------------------------------------------\n",
"Time: 2018-03-03 08:00:24\n",
"-------------------------------------------\n",
"\n"
]
}
],
Expand Down
Loading

0 comments on commit 487d664

Please sign in to comment.