Skip to content

Commit

Permalink
fs2: set executor
Browse files Browse the repository at this point in the history
  • Loading branch information
jtjeferreira committed May 18, 2021
1 parent 8250933 commit aaac311
Showing 1 changed file with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,51 @@ import fs2.grpc.syntax.all._
import io.grpc.examples.helloworld.helloworld.GreeterFs2Grpc
import io.grpc.{ServerBuilder, ServerServiceDefinition}

import java.util.concurrent.Executors


object GreeterServer extends IOApp {

def run(args: List[String]): IO[ExitCode] = {
val service: Resource[IO, ServerServiceDefinition] =
GreeterFs2Grpc.bindServiceResource[IO](new GreeterServiceImpl())

def run(service: ServerServiceDefinition) = ServerBuilder
.forPort(50051)
def serverBuilder: ServerBuilder[_] = {
val sb = ServerBuilder.forPort(50051)

/**
* Allow customization of the Executor with two environment variables:
*
* <p>
* <ul>
* <li>JVM_EXECUTOR_TYPE: direct, workStealing, single, fixed, cached</li>
* <li>JVM_EXECUTOR_THREADS: integer value.</li>
* </ul>
* </p>
*
* The number of Executor Threads will default to the number of
* availableProcessors(). Only the workStealing and fixed executors will use
* this value.
*/
val threads = System.getenv("JVM_EXECUTOR_THREADS")
var i_threads = Runtime.getRuntime.availableProcessors
if (threads != null && !threads.isEmpty) i_threads = threads.toInt
/*
* Use a Direct Executor by default (best performance) since the GRPC
* service in this code is guaranteed non-blocking
*/
val value = System.getenv.getOrDefault("JVM_EXECUTOR_TYPE", "workStealing")
value match {
case "direct" => sb.directExecutor
case "single" => sb.executor(Executors.newSingleThreadExecutor)
case "fixed" => sb.executor(Executors.newFixedThreadPool(i_threads))
case "workStealing" => sb.executor(Executors.newWorkStealingPool(i_threads))
case "cached" => sb.executor(Executors.newCachedThreadPool)
}
sb
}

def run(service: ServerServiceDefinition) = serverBuilder
.addService(service)
.resource[IO]
.evalMap(server => IO(server.start()))
Expand Down

0 comments on commit aaac311

Please sign in to comment.