Skip to content

Commit

Permalink
Adding Spark example to Write Pojo classes to Stream (apache#5470)
Browse files Browse the repository at this point in the history
### Motivation

Added the class SensorReading and class ProducerSparkWithPojo  to showcase writing Pojo as Json Object to Pulsar Queue 

### Modifications

Based on the documentation written example class to write Pojos to Pulsar 
This might be more real life examples for application than bytes stream
  • Loading branch information
kishorebhat-p authored and sijie committed Nov 26, 2019
1 parent 14f3eee commit a88300c
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.streaming.receiver.example;

import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.JSONSchema;


/**
* producer data to spark streaming receiver with Json/Pojo Object
*
* <p>Example usage:
* pulsar://localhost:6650 test_src
*/
public class ProducerSparkWithPojo {


public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Missing parameters!");
System.err.println("Usage: <pulsar-service-url> <topic>");
return;
}

System.out.println("Parameters:");
System.out.println("\tServiceUrl:\t" + args[0]);
System.out.println("\tTopic:\t" + args[1]);

try (PulsarClient client = PulsarClient.builder().serviceUrl(args[0]).build()) {
try (Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic(args[1]).sendTimeout(3, TimeUnit.SECONDS).create();) {
for (int i = 0; i < 100; i++) {
SensorReading rd = new SensorReading(i, "message " + i);
producer.send(rd);
}
}
}

System.out.println("producer spark streaming msg end ...");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.streaming.receiver.example;

/**
* Rest Pojo class to send to Producer
*
*/
public class SensorReading {

public SensorReading(int sensor_id, String sensor_value) {
super();
this.sensor_id = sensor_id;
this.sensor_value = sensor_value;
}

public int getSensor_id() {
return sensor_id;
}

public void setSensor_id(int sensor_id) {
this.sensor_id = sensor_id;
}

public String getSensor_value() {
return sensor_value;
}

public void setSensor_value(String sensor_value) {
this.sensor_value = sensor_value;
}

public int sensor_id;

public String sensor_value;

}

0 comments on commit a88300c

Please sign in to comment.