Skip to content

Commit

Permalink
apacheGH-1804: Configurable fuseki modules
Browse files Browse the repository at this point in the history
  • Loading branch information
afs committed Mar 18, 2023
1 parent 33c14fb commit c758d2b
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@
import org.apache.jena.fuseki.build.FusekiConfig;
import org.apache.jena.fuseki.ctl.*;
import org.apache.jena.fuseki.main.cmds.FusekiMain;
import org.apache.jena.fuseki.main.sys.FusekiErrorHandler;
import org.apache.jena.fuseki.main.sys.FusekiModuleStep;
import org.apache.jena.fuseki.main.sys.JettyLib;
import org.apache.jena.fuseki.main.sys.*;
import org.apache.jena.fuseki.metrics.MetricsProviderRegistry;
import org.apache.jena.fuseki.server.*;
import org.apache.jena.fuseki.servlets.*;
Expand Down Expand Up @@ -151,15 +149,18 @@ public static Builder create(OperationRegistry serviceDispatchRegistry) {
private int httpsPort;
private final String staticContentDir;
private final ServletContext servletContext;
private final FusekiModules modules;

private FusekiServer(int httpPort, int httpsPort, Server server,
String staticContentDir,
FusekiModules modules,
ServletContext fusekiServletContext) {
this.server = server;
this.server = Objects.requireNonNull(server);
this.httpPort = httpPort;
this.httpsPort = httpsPort;
this.staticContentDir = staticContentDir;
this.servletContext = fusekiServletContext;
this.servletContext = Objects.requireNonNull(fusekiServletContext);
this.modules = Objects.requireNonNull(modules);
}

/**
Expand Down Expand Up @@ -280,6 +281,13 @@ public String getStaticContentDir() {
return staticContentDir;
}

/**
* Return the list of {@link FusekiModule}s for this server.
*/
public FusekiModules getModules() {
return modules;
}

/**
* Start the server - the server continues to run after this call returns.
* To synchronise with the server stopping, call {@link #join}.
Expand All @@ -288,7 +296,6 @@ public FusekiServer start() {
try {
FusekiModuleStep.serverBeforeStarting(this);
server.start();
FusekiModuleStep.serverAfterStarting(this);
}
catch (IOException ex) {
if ( ex.getCause() instanceof java.security.UnrecoverableKeyException )
Expand All @@ -303,6 +310,7 @@ public FusekiServer start() {
throw new FusekiException(ex);
}

// Post-start completion. Find the ports.
Connector[] connectors = server.getServer().getConnectors();
if ( connectors.length == 0 )
serverLog.warn("Start Fuseki: No connectors");
Expand All @@ -318,6 +326,8 @@ public FusekiServer start() {
}
});

FusekiModuleStep.serverAfterStarting(this);

if ( httpsPort > 0 && httpPort > 0 )
Fuseki.serverLog.info("Start Fuseki (http="+httpPort+" https="+httpsPort+")");
else if ( httpsPort > 0 )
Expand Down Expand Up @@ -415,6 +425,10 @@ public static class Builder {
private List<Pair<String, Filter>> beforeFilters = new ArrayList<>();
private List<Pair<String, Filter>> afterFilters = new ArrayList<>();

// Modules to use to process the building of the server.
// The default (fusekiModules is null) is the system-wide modules.
private FusekiModules fusekiModules = null;

private String contextPath = "/";
private String staticContentDir = null;
private SecurityHandler securityHandler = null;
Expand Down Expand Up @@ -1037,6 +1051,26 @@ public Builder addFilter(String pathSpec, Filter filter) {
return this;
}

/**
* Set the {@link FusekiModule Fuseki Module} for a server.
* If no modules are added to a builder, then the system-wide default set (found by loading FusekiModule
* via Java's {@link ServiceLoader} mechanism) is used.
* <p>Pass {@code null} to switch back the system-wide default set.
*
* @see FusekiModules
*/
public Builder setModules(FusekiModules modules) {
fusekiModules = modules;
return this;
}

/**
* Return the current list of Fuseki modules in the builder.
*/
public FusekiModules getFusekiModules() {
return fusekiModules;
}

/**
* Add an operation and handler to the server. This does not enable it for any dataset.
* <p>
Expand Down Expand Up @@ -1192,9 +1226,13 @@ public FusekiServer build() {
if ( serverHttpPort < 0 && serverHttpsPort < 0 )
serverHttpPort = DefaultServerPort;

FusekiModules modules = (fusekiModules == null)
? FusekiModulesSystem.get()
: fusekiModules;

// FusekiModule call - final preparations.
Set<String> datasetNames = Set.copyOf(dataServices.keys());
FusekiModuleStep.prepare(this, datasetNames, configModel);
FusekiModuleStep.prepare(modules, this, datasetNames, configModel);

// Freeze operation registry (builder may be reused).
OperationRegistry operationReg = new OperationRegistry(operationRegistry);
Expand All @@ -1203,7 +1241,7 @@ public FusekiServer build() {
DataAccessPointRegistry dapRegistry = buildStart();

// FusekiModule call - inspect the DataAccessPointRegistry.
FusekiModuleStep.configured(this, dapRegistry, configModel);
FusekiModuleStep.configured(modules, this, dapRegistry, configModel);

// Setup Prometheus metrics. This will become a module.
bindPrometheus(dapRegistry);
Expand All @@ -1228,7 +1266,7 @@ public FusekiServer build() {

if ( jettyServerConfig != null ) {
Server server = jettyServer(handler, jettyServerConfig);
return new FusekiServer(-1, -1, server, staticContentDir, handler.getServletContext());
return new FusekiServer(-1, -1, server, staticContentDir, modules, handler.getServletContext());
}

Server server;
Expand All @@ -1245,7 +1283,7 @@ public FusekiServer build() {
if ( networkLoopback )
applyLocalhost(server);

FusekiServer fusekiServer = new FusekiServer(httpPort, httpsPort, server, staticContentDir, handler.getServletContext());
FusekiServer fusekiServer = new FusekiServer(httpPort, httpsPort, server, staticContentDir, modules, handler.getServletContext());
FusekiModuleStep.server(fusekiServer);
return fusekiServer;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,55 @@

/** Call points for FusekiModule extensions */
public class FusekiModuleStep {

/**
* Call at the start of "build" step.
* The builder has been set according to the configuration.
* The "configModel" parameter is set if a configuration file was used else it is null.
*/
public static void prepare(FusekiModules modules, FusekiServer.Builder serverBuilder, Set<String> datasetNames, Model configModel) {
modules.forEach(module -> module.prepare(serverBuilder, datasetNames, configModel));
}

/** @deprecated Use {@code prepare(FusekiModules, ...)}. */
@Deprecated
public static void prepare(FusekiServer.Builder serverBuilder, Set<String> datasetNames, Model configModel) {
FusekiModules.forEachModule(module -> module.prepare(serverBuilder, datasetNames, configModel));
prepare(systemModules(), serverBuilder, datasetNames, configModel);
}

/**
* The DataAccessPointRegistry that will be used to build the server.
*
*/
public static void configured(FusekiModules modules, FusekiServer.Builder serverBuilder, DataAccessPointRegistry dapRegistry, Model configModel) {
modules.forEach(module -> module.configured(serverBuilder, dapRegistry, configModel));
}

/** @deprecated Use {@code configured(FusekiModules.loaded(), ...)}. */
@Deprecated
public static void configured(FusekiServer.Builder serverBuilder, DataAccessPointRegistry dapRegistry, Model configModel) {
FusekiModules.forEachModule(module -> module.configured(serverBuilder, dapRegistry, configModel));
configured(systemModules(), serverBuilder, dapRegistry, configModel);
}

/**
* The outcome of the "build" step.
*/
public static void server(FusekiServer server) {
FusekiModules.forEachModule(module -> module.server(server));
server.getModules().forEach(module -> module.server(server));
}

/**
* Called just before {@code server.start()} called.
*/
public static void serverBeforeStarting(FusekiServer server) {
FusekiModules.forEachModule(module -> module.serverBeforeStarting(server));
server.getModules().forEach(module -> module.serverBeforeStarting(server));
}

/**
* Called just after {@code server.start()} called.
*/
public static void serverAfterStarting(FusekiServer server) {
FusekiModules.forEachModule(module -> module.serverAfterStarting(server));
server.getModules().forEach(module -> module.serverAfterStarting(server));
}

/**
Expand All @@ -70,6 +83,10 @@ public static void serverAfterStarting(FusekiServer server) {
* simply exits the JVM or is killed externally.
*/
public static void serverStopped(FusekiServer server) {
FusekiModules.forEachModule(module -> module.serverStopped(server));
server.getModules().forEach(module -> module.serverStopped(server));
}

private static FusekiModules systemModules() {
return FusekiModulesSystem.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,57 @@

package org.apache.jena.fuseki.main.sys;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

import org.apache.jena.base.module.Subsystem;

/** Registry of modules */
/**
* List of {@linkplain FusekiModule Fuseki modules}.
* This is the immutable collection of modules for a server.
* <p>
* @see FusekiModulesLoaded
*/
public class FusekiModules {

private static Object lock = new Object();
/** A Fuseki module with no members. */
public static final FusekiModules empty = FusekiModules.create();

// Record of what is loaded.
private static List<FusekiModule> registry = null;

private static Subsystem<FusekiModule> subsystem = null;
/** Create a collection of Fuseki modules */
public static FusekiModules create(FusekiModule ... modules) {
return new FusekiModules(modules);
}

public static void load() {
if ( registry == null )
reload();
/** Create a collection of Fuseki modules */
public static FusekiModules create(List<FusekiModule> modules) {
return new FusekiModules(modules);
}

public static void reload() {
registry = new ArrayList<>();
subsystem = new Subsystem<FusekiModule>(FusekiModule.class);
subsystem.initialize();
synchronized(lock) {
subsystem.forEach(registry::add);
}
private final List<FusekiModule> modules;

private FusekiModules(FusekiModule ... modules) {
this.modules = List.of(Objects.requireNonNull(modules));
}

/** Add a code module */
public static void add(FusekiModule module) {
synchronized(lock) {
load();
module.start();
registry.add(module);
}
private FusekiModules(List<FusekiModule> modules) {
this.modules = List.copyOf(Objects.requireNonNull(modules));
}

/** Remove a code module */
public static void remove(FusekiModule module) {
synchronized(lock) {
registry.remove(module);
module.stop();
}
/**
* Return an immutable list of modules.
*/
public List<FusekiModule> asList() {
return List.copyOf(modules);
}

/** Test whether a code module is registered. */
public static boolean contains(FusekiModule module) {
synchronized(lock) {
return registry.contains(module);
}
/**
* Apply an action to each module, in order, one at a time.
*/
public void forEach(Consumer<FusekiModule> action) {
modules.forEach(action);
}

/*package*/ static void forEachModule(Consumer<FusekiModule> action) {
synchronized(lock) {
if ( registry == null )
load();
if ( registry == null || registry.isEmpty() )
return ;
registry.forEach(action);
}
/** Test whether a code module is registered. */
public boolean contains(FusekiModule module) {
return modules.contains(module);
}
}
Loading

0 comments on commit c758d2b

Please sign in to comment.