forked from traccar/traccar
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
initial flespi integtation: pulling messages from flespi channels and…
… updating devices' position
- Loading branch information
1 parent
e876250
commit 5193bd8
Showing
6 changed files
with
179 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package org.traccar.flespi; | ||
|
||
public class ChannelPullTask implements Runnable { | ||
|
||
private final FlespiClient flespiClient; | ||
|
||
protected ChannelPullTask(FlespiClient flespiClient) { | ||
this.flespiClient = flespiClient; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
flespiClient.channelPull(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
package org.traccar.flespi; | ||
|
||
|
||
import com.ning.http.client.AsyncCompletionHandler; | ||
import com.ning.http.client.Response; | ||
import org.traccar.Context; | ||
import org.traccar.helper.Log; | ||
import org.traccar.model.Device; | ||
import org.traccar.model.Position; | ||
|
||
import javax.json.Json; | ||
import javax.json.JsonObject; | ||
import javax.json.JsonNumber; | ||
import javax.json.JsonReader; | ||
import javax.json.JsonArray; | ||
import java.util.Date; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
public class FlespiClient { | ||
private final String url; | ||
private final String token; | ||
private final String channel_id; | ||
private final ScheduledExecutorService pullChannelExecutor = Executors.newScheduledThreadPool(5); | ||
private ScheduledFuture<?> pullTask; | ||
private Integer pullDelay = 5; | ||
private int nextKey; | ||
|
||
public FlespiClient(String url, String token, String channelId) { | ||
this.channel_id = channelId; | ||
this.url = url + "/gw/channels/" + channelId + "/messages?data={\"limit_count\":1000000," | ||
+ "\"limit_size\":100000000,\"delete\":true,\"timeout\":25,\"curr_key\":%d}"; | ||
this.token = "FlespiToken " + token; | ||
|
||
schedulePull(); | ||
} | ||
|
||
private void schedulePull() { | ||
pullTask = pullChannelExecutor.scheduleAtFixedRate(new ChannelPullTask(this), 1, pullDelay, TimeUnit.SECONDS); | ||
} | ||
|
||
public void stopPullTask() { | ||
if (pullTask != null) { | ||
pullTask.cancel(false); | ||
} | ||
} | ||
|
||
protected synchronized void channelPull() { | ||
Context.getAsyncHttpClient().prepareGet(String.format(this.url, nextKey)) | ||
.addHeader("Authorization", this.token) | ||
.execute(new AsyncCompletionHandler() { | ||
@Override | ||
public Object onCompleted(Response response) throws Exception { | ||
try (JsonReader reader = Json.createReader(response.getResponseBodyAsStream())) { | ||
JsonObject object = reader.readObject(); | ||
JsonArray result = object.getJsonArray("result"); | ||
nextKey = object.getInt("next_key", nextKey); | ||
Log.debug(String.format("channelPull next_key=%d msgs_count=%d", nextKey, result.size())); | ||
for (int i = 0; i < result.size(); i++) { | ||
Position position = decodePosition(result.getJsonObject(i)); | ||
if (position != null && position.getLatitude() != 0 && position.getLongitude() != 0) { | ||
Context.getConnectionManager().updateDevice(position.getDeviceId(), | ||
Device.STATUS_ONLINE, new Date()); | ||
Context.getDeviceManager().updateLatestPosition(position); | ||
} | ||
} | ||
JsonArray errors = object.getJsonArray("errors"); | ||
if (errors != null) { | ||
for (int i = 0; i < errors.size(); i++) { | ||
JsonObject error = errors.getJsonObject(i); | ||
Log.warning("Error in flespi channel: " + error.toString()); | ||
} | ||
if (result == null || result.size() == 0) { | ||
stopPullTask(); | ||
} | ||
} | ||
} | ||
return null; | ||
} | ||
|
||
@Override | ||
public void onThrowable(Throwable t) { | ||
t.printStackTrace(); | ||
} | ||
}); | ||
} | ||
|
||
private Position decodePosition(JsonObject msg) { | ||
Device device = null; | ||
try { | ||
device = Context.getIdentityManager().getByUniqueId(msg.getString("ident")); | ||
} catch (Exception e) { | ||
e.printStackTrace(); | ||
return null; | ||
} | ||
if (device == null) { | ||
return null; | ||
} | ||
Position position = new Position(); | ||
position.setDeviceId(device.getId()); | ||
|
||
|
||
position.setProtocol("flespi"); | ||
|
||
position.setTime(new Date((long) msg.getJsonNumber("timestamp").doubleValue() * 1000)); | ||
JsonNumber lat = msg.getJsonNumber("position.latitude"); | ||
JsonNumber lon = msg.getJsonNumber("position.longitude"); | ||
position.setLatitude((lat != null && lon != null) ? lat.doubleValue() : 0); | ||
position.setLongitude((lat != null && lon != null) ? lon.doubleValue() : 0); | ||
|
||
JsonNumber speed = msg.getJsonNumber("position.speed"); | ||
position.setSpeed(speed != null ? speed.doubleValue() : 0); | ||
|
||
JsonNumber course = msg.getJsonNumber("position.direction"); | ||
position.setCourse(course != null ? course.doubleValue() : 0); | ||
|
||
JsonNumber altitude = msg.getJsonNumber("position.altitude"); | ||
position.setAltitude(altitude != null ? altitude.doubleValue() : 0); | ||
|
||
int satellites = msg.getInt("position.satellites", 0); | ||
position.setValid(lat != null && lon != null && satellites >= 3); | ||
position.set(Position.KEY_SATELLITES, satellites); | ||
|
||
return position; | ||
} | ||
} |