Skip to content

Commit

Permalink
GEODE-3126: Adding a query command to the experimental driver
Browse files Browse the repository at this point in the history
Adding a QueryService and query operations to the experimental client.
  • Loading branch information
upthewaterspout committed Feb 28, 2018
1 parent 8ce5ebf commit c373f37
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public interface Driver {
*/
<K, V> Region<K, V> getRegion(String regionName);

QueryService getQueryService();

/**
* Close this Driver, rendering it useless
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private Socket connectToAServer() throws IOException {

/**
* Queries locators for a Geode server that has Protobuf enabled.
*
*
* @return The server chosen by the Locator service for this client
*/
private InetSocketAddress findAServer() throws IOException {
Expand Down Expand Up @@ -140,6 +140,9 @@ private InetSocketAddress findAServer() throws IOException {
private Message readResponse() throws IOException {
final InputStream inputStream = socket.getInputStream();
Message response = ClientProtocol.Message.parseDelimitedFrom(inputStream);
if (response == null) {
throw new IOException("Unable to parse a response message due to EOF");
}
final ErrorResponse errorResponse = response.getErrorResponse();
if (errorResponse != null && errorResponse.hasError()) {
throw new IOException(errorResponse.getError().getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@
*/
@Experimental
public class ProtobufDriver implements Driver {
/**
* Set of Internet-address-or-host-name/port pairs of the locators to use to find GemFire servers
* that have Protobuf enabled.
*/
private final Set<InetSocketAddress> locators;

private final ProtobufChannel channel;

Expand All @@ -57,9 +52,6 @@ public class ProtobufDriver implements Driver {
* @throws IOException
*/
ProtobufDriver(Set<InetSocketAddress> locators) throws IOException {
this.locators = locators;


this.channel = new ProtobufChannel(locators);
}

Expand All @@ -84,6 +76,11 @@ public <K, V> Region<K, V> getRegion(String regionName) {
return new ProtobufRegion(regionName, channel);
}

@Override
public QueryService getQueryService() {
return new ProtobufQueryService(channel);
}

@Override
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.experimental.driver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.google.protobuf.ProtocolStringList;

import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.EncodedValue;
import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.Table;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase;
import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.OQLQueryRequest;
import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.OQLQueryResponse;

class ProtobufQueryService implements QueryService {
private final ProtobufChannel channel;

public ProtobufQueryService(ProtobufChannel channel) {
this.channel = channel;
}

@Override
public <T> Query newQuery(final String queryString) {
return new ProtobufQuery<T>(queryString);
}

class ProtobufQuery<T> implements Query<T> {

private final String queryString;

public ProtobufQuery(final String queryString) {
this.queryString = queryString;
}

@Override
public List<T> execute(final Object... bindParameters) throws IOException {
List<EncodedValue> encodedParameters = Arrays.asList(bindParameters).stream()
.map(ValueEncoder::encodeValue).collect(Collectors.toList());;
Message request = Message.newBuilder().setOqlQueryRequest(
OQLQueryRequest.newBuilder().addAllBindParameter(encodedParameters).setQuery(queryString))
.build();
final OQLQueryResponse response =
channel.sendRequest(request, MessageTypeCase.OQLQUERYRESPONSE).getOqlQueryResponse();
switch (response.getResultCase()) {
case SINGLERESULT:
return (List<T>) parseSingleResult(response);
case LISTRESULT:
return parseListResult(response);
case TABLERESULT:
return (List<T>) parseTableResult(response);
default:
throw new RuntimeException("Unexpected response: " + response);
}
}

private List<Map<String, Object>> parseTableResult(final OQLQueryResponse response) {
final Table table = response.getTableResult();
final ProtocolStringList fieldNames = table.getFieldNameList();
List<Map<String, Object>> results = new ArrayList<>();
for (BasicTypes.EncodedValueList row : table.getRowList()) {
final List<Object> decodedRow = row.getElementList().stream().map(ValueEncoder::decodeValue)
.collect(Collectors.toList());

Map<String, Object> rowMap = new LinkedHashMap<>(decodedRow.size());
for (int i = 0; i < decodedRow.size(); i++) {
rowMap.put(fieldNames.get(i), decodedRow.get(i));
}
}

return results;
}

private List<T> parseListResult(final OQLQueryResponse response) {
return response.getListResult().getElementList().stream()
.map(value -> (T) ValueEncoder.decodeValue(value)).collect(Collectors.toList());
}

private List<Object> parseSingleResult(final OQLQueryResponse response) {
return Collections.singletonList(ValueEncoder.decodeValue(response.getSingleResult()));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.experimental.driver;

import java.io.IOException;
import java.util.List;

public interface Query<T> {

List<T> execute(Object... bindParameters) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.experimental.driver;

public interface QueryService {

<T> Query newQuery(String queryString);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.experimental.driver;

import java.util.Properties;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.contrib.java.lang.system.RestoreSystemProperties;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.Locator;

/**
* Created by dan on 2/23/18.
*/
public class IntegrationTestBase {
private static final String REGION = "region";
@Rule
public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
protected Driver driver;
protected org.apache.geode.cache.Region<Object, Object> serverRegion;
private Locator locator;
private Cache cache;

@Before
public void createServerAndDriver() throws Exception {
System.setProperty("geode.feature-protobuf-protocol", "true");

// Create a cache
CacheFactory cf = new CacheFactory();
cf.set(ConfigurationProperties.MCAST_PORT, "0");
cache = cf.create();

// Start a locator
locator = Locator.startLocatorAndDS(0, null, new Properties());
int locatorPort = locator.getPort();

// Start a server
CacheServer server = cache.addCacheServer();
server.setPort(0);
server.start();

// Create a region
serverRegion = cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION);

// Create a driver connected to the server
driver = new DriverFactory().addLocator("localhost", locatorPort).create();

}

@After
public void cleanup() {
locator.stop();
cache.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.experimental.driver;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.test.junit.categories.IntegrationTest;

@Category(IntegrationTest.class)
public class QueryServiceIntegrationTest extends IntegrationTestBase {

@Test
public void testQuery() throws IOException {
serverRegion.put("key1", "value1");
serverRegion.put("key2", "value2");

QueryService service = driver.getQueryService();

Query<String> query = service.newQuery("select value from /region value order by value");
final List<String> results = query.execute();

assertEquals(Arrays.asList("value1", "value2"), results);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@
import org.apache.geode.test.junit.categories.IntegrationTest;

@Category(IntegrationTest.class)
public class RegionIntegrationTest {
@Rule
public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
public class RegionIntegrationTest extends IntegrationTestBase {

/** a JSON document */
private static final String jsonDocument =
Expand All @@ -54,47 +52,6 @@ public class RegionIntegrationTest {
+ System.lineSeparator() + " \"emailAddress\" : \"none\"" + System.lineSeparator() + "}";


private static final String REGION = "region";
private Locator locator;
private Cache cache;
private Driver driver;

private org.apache.geode.cache.Region<Object, Object> serverRegion;

@Before
public void createServerAndDriver() throws Exception {
System.setProperty("geode.feature-protobuf-protocol", "true");

// Create a cache
CacheFactory cf = new CacheFactory();
cf.set(ConfigurationProperties.MCAST_PORT, "0");
cache = cf.create();

// Start a locator
locator = Locator.startLocatorAndDS(0, null, new Properties());
int locatorPort = locator.getPort();

// Start a server
CacheServer server = cache.addCacheServer();
server.setPort(0);
server.start();

// Create a region
serverRegion = cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION);

// Create a driver connected to the server
driver = new DriverFactory().addLocator("localhost", locatorPort).create();

}

@After
public void cleanup() {
locator.stop();
cache.close();
}



@Test
public void getShouldReturnPutValue() throws Exception {
Region<String, String> region = driver.getRegion("region");
Expand Down

0 comments on commit c373f37

Please sign in to comment.