Skip to content

Commit

Permalink
Improve GeoIpDownloaderIT test suite and improve geoip failure taggin…
Browse files Browse the repository at this point in the history
…g. (elastic#79131)

In the case a database couldn't be loaded, the geoip processor factory
checks whether any databases are available and then returns a processor
implementation that tags documents with the fact that required database
wasn't available. The GeoIpProcessor itself also loads the database, but
in case a database can't be loaded then it always fails with resource
missing exception. The GeoIpProcessor is modified in this change to
also check whether any database is available and in that case tag
documents instead of failing.

GeoIpDownloaderIT improvements:
* The `testUseGeoIpProcessorWithDownloadedDBs()` was adding databases to config dirs,
  but not cleaning it up. Which broke assumptions in others in this suite, because
  the test cluster is reused.
* Use the geoip stats api after each test to wait for a clean state, which means
  wait for database downloader to be disabled and all database files to be removed
  on all ingest nodes.
* Don't use `IngestDocument#getFieldValue(...)` in test code surrounded by `assertBusy(...)`.
  If a field isn't there an illegal state exception is thrown, which isn't caught by
  `assertBusy(...)`. Only assertion errors are handled.

Closes elastic#79074
  • Loading branch information
martijnvg authored Oct 14, 2021
1 parent bf15ccc commit 5e18333
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -109,6 +110,18 @@ public void cleanUp() throws Exception {
assertThat(state.getDatabases(), anEmptyMap());
}
});
assertBusy(() -> {
GeoIpDownloaderStatsAction.Response response =
client().execute(GeoIpDownloaderStatsAction.INSTANCE, new GeoIpDownloaderStatsAction.Request()).actionGet();
assertThat(response.getStats().getDatabasesCount(), equalTo(0));
assertThat(response.getNodes(), not(empty()));
for (GeoIpDownloaderStatsAction.NodeResponse nodeResponse : response.getNodes()) {
assertThat(nodeResponse.getConfigDatabases(), empty());
assertThat(nodeResponse.getDatabases(), empty());
assertThat(nodeResponse.getFilesInTemp().stream().filter(s -> s.endsWith(".txt") == false).collect(Collectors.toList()),
empty());
}
});
assertBusy(() -> {
List<Path> geoIpTmpDirs = getGeoIpTmpDirs();
for (Path geoIpTmpDir : geoIpTmpDirs) {
Expand Down Expand Up @@ -263,7 +276,6 @@ public void testGeoIpDatabasesDownload() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/79074")
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
Expand Down Expand Up @@ -310,9 +322,9 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
}
}
});
deleteDatabasesInConfigDirectory();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/79074")
public void testStartWithNoDatabases() throws Exception {
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
putPipeline();
Expand All @@ -330,31 +342,24 @@ public void testStartWithNoDatabases() throws Exception {
// Enable downloader:
Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
{
assertBusy(() -> {
SimulateDocumentBaseResult result = simulatePipeline();
assertThat(result.getFailure(), nullValue());
assertThat(result.getIngestDocument(), notNullValue());
Map<?, ?> source = result.getIngestDocument().getSourceAndMetadata();
assertThat(source, not(hasKey("tags")));
assertThat(source, hasKey("ip-city"));
assertThat(source, hasKey("ip-asn"));
assertThat(source, hasKey("ip-country"));

assertThat(((Map<?, ?>) source.get("ip-city")).get("city_name"), equalTo("Linköping"));
assertThat(((Map<?, ?>) source.get("ip-asn")).get("organization_name"), equalTo("Bredband2 AB"));
assertThat(((Map<?, ?>) source.get("ip-country")).get("country_name"), equalTo("Sweden"));
});
}
verifyUpdatedDatabase();
}

private void verifyUpdatedDatabase() throws Exception {
assertBusy(() -> {
SimulateDocumentBaseResult result = simulatePipeline();
assertThat(result.getFailure(), nullValue());
assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping"));
assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
assertThat(result.getIngestDocument(), notNullValue());

Map<?, ?> source = result.getIngestDocument().getSourceAndMetadata();
assertThat(source, not(hasKey("tags")));
assertThat(source, hasKey("ip-city"));
assertThat(source, hasKey("ip-asn"));
assertThat(source, hasKey("ip-country"));

assertThat(((Map<?, ?>) source.get("ip-city")).get("city_name"), equalTo("Linköping"));
assertThat(((Map<?, ?>) source.get("ip-asn")).get("organization_name"), equalTo("Bredband2 AB"));
assertThat(((Map<?, ?>) source.get("ip-country")).get("country_name"), equalTo("Sweden"));
});
}

Expand Down Expand Up @@ -487,6 +492,29 @@ private void setupDatabasesInConfigDirectory() throws Exception {
});
}

private void deleteDatabasesInConfigDirectory() throws Exception {
StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false)
.map(Environment::configFile)
.map(path -> path.resolve("ingest-geoip"))
.distinct()
.forEach(path -> {
try {
IOUtils.rm(path);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});

assertBusy(() -> {
GeoIpDownloaderStatsAction.Response response =
client().execute(GeoIpDownloaderStatsAction.INSTANCE, new GeoIpDownloaderStatsAction.Request()).actionGet();
assertThat(response.getNodes(), not(empty()));
for (GeoIpDownloaderStatsAction.NodeResponse nodeResponse : response.getNodes()) {
assertThat(nodeResponse.getConfigDatabases(), empty());
}
});
}

@SuppressForbidden(reason = "Maxmind API requires java.io.File")
private void parseDatabase(Path tempFile) throws IOException {
try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public final class GeoIpProcessor extends AbstractProcessor {
private final Set<Property> properties;
private final boolean ignoreMissing;
private final boolean firstOnly;
private final String databaseFile;

/**
* Construct a geo-IP processor.
*
* @param tag the processor tag
* @param tag the processor tag
* @param description the processor description
* @param field the source field to geo-IP map
* @param supplier a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use
Expand All @@ -83,6 +83,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
* @param properties the properties; ideally this is lazily-loaded once on first use
* @param ignoreMissing true if documents with a missing value for the field should be ignored
* @param firstOnly true if only first result should be returned in case of array
* @param databaseFile
*/
GeoIpProcessor(
final String tag,
Expand All @@ -93,7 +94,8 @@ public final class GeoIpProcessor extends AbstractProcessor {
final String targetField,
final Set<Property> properties,
final boolean ignoreMissing,
final boolean firstOnly) {
final boolean firstOnly,
final String databaseFile) {
super(tag, description);
this.field = field;
this.isValid = isValid;
Expand All @@ -102,6 +104,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
this.properties = properties;
this.ignoreMissing = ignoreMissing;
this.firstOnly = firstOnly;
this.databaseFile = databaseFile;
}

boolean isIgnoreMissing() {
Expand All @@ -121,8 +124,14 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information.");
}

DatabaseReaderLazyLoader lazyLoader = this.supplier.get();
if (lazyLoader == null) {
tag(ingestDocument, databaseFile);
return ingestDocument;
}

if (ip instanceof String) {
Map<String, Object> geoData = getGeoData((String) ip);
Map<String, Object> geoData = getGeoData(lazyLoader, (String) ip);
if (geoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, geoData);
}
Expand All @@ -133,7 +142,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException
if (ipAddr instanceof String == false) {
throw new IllegalArgumentException("array in field [" + field + "] should only contain strings");
}
Map<String, Object> geoData = getGeoData((String) ipAddr);
Map<String, Object> geoData = getGeoData(lazyLoader, (String) ipAddr);
if (geoData.isEmpty()) {
geoDataList.add(null);
continue;
Expand All @@ -154,8 +163,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException
return ingestDocument;
}

private Map<String, Object> getGeoData(String ip) throws IOException {
DatabaseReaderLazyLoader lazyLoader = this.supplier.get();
private Map<String, Object> getGeoData(DatabaseReaderLazyLoader lazyLoader, String ip) throws IOException {
try {
final String databaseType = lazyLoader.getDatabaseType();
final InetAddress ipAddress = InetAddresses.forString(ip);
Expand Down Expand Up @@ -431,7 +439,9 @@ public Processor create(
}
CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier = () -> {
DatabaseReaderLazyLoader loader = databaseRegistry.getDatabase(databaseFile);
if (loader == null) {
if (loader == null && databaseRegistry.getAvailableDatabases().isEmpty() == false) {
return null;
} else if (loader == null) {
throw new ResourceNotFoundException("database file [" + databaseFile + "] doesn't exist");
}
// Only check whether the suffix has changed and not the entire database type.
Expand Down Expand Up @@ -467,7 +477,7 @@ public Processor create(
return valid;
};
return new GeoIpProcessor(processorTag, description, ipField, supplier, isValid, targetField, properties, ignoreMissing,
firstOnly);
firstOnly, databaseFile);
}
}

Expand Down Expand Up @@ -543,7 +553,7 @@ static class DatabaseUnavailableProcessor extends AbstractProcessor {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
ingestDocument.appendFieldValue("tags", "_geoip_database_unavailable_" + databaseName, true);
tag(ingestDocument, databaseName);
return ingestDocument;
}

Expand All @@ -556,4 +566,8 @@ public String getDatabaseName() {
return databaseName;
}
}

private static void tag(IngestDocument ingestDocument, String databaseName) {
ingestDocument.appendFieldValue("tags", "_geoip_database_unavailable_" + databaseName, true);
}
}
Loading

0 comments on commit 5e18333

Please sign in to comment.