diff --git a/site2/docs/admin-api-schemas.md b/site2/docs/admin-api-schemas.md new file mode 100644 index 0000000000000..d1f45d8583562 --- /dev/null +++ b/site2/docs/admin-api-schemas.md @@ -0,0 +1,98 @@ +--- +id: admin-api-schemas +title: Managing Schemas +sidebar_label: Schemas +--- + +Schemas, like other entities in Pulsar, can be managed using the [admin API](admin-api-overview.md). + +## Schema resources + +A Pulsar schema is a fairly simple data structure stored in Pulsar for representing the structure of messages stored in a Pulsar topic. The schema structure consists of: + +- *Name*: A schema's name is the topic that the schema is associated to. +- *Type*: A schema type represents the type of the schema. The predefined schema types can be found [here](concepts-schema-registry.md#supported-schema-formats). If it + is a customized schema, it is left as an empty string. +- *Payload*: It is a binary representation of the schema. How to interpret it is up to the implementation of the schema. +- *Properties*: It is a user defined properties as a string/string map. Applications can use this bag for carrying any application specific logics. Possible properties + might be the Git hash associated with the schema, an environment string like `dev` or `prod`, etc. + +All the schemas are versioned with versions. So you can retrieve the schema definition of a given version if the version is not deleted. + +### Upload Schema + +#### pulsar-admin + +You can upload a new schema using the [`upload`](reference-pulsar-admin.md#get-5) subcommand: + +```shell +$ pulsar-admin schemas upload --filename /path/to/schema-definition-file +``` + +The schema definition file should contain following json string on defining how the schema look like: + +```json +{ + "type": "STRING", + "schema": "", + "properties": { + "key1" : "value1" + } +} +``` + +An example of the schema definition file can be found at {@inject: github:SchemaExample:/conf/schema_example.conf}. + +#### REST + +{@inject: endpoint|POST|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/uploadSchema} + +### Get Schema + +#### pulsar-admin + +You can get the latest version of Schema using the [`get`](reference-pulsar-admin.md#get-5) subcommand. + +```shell +$ pulsar-admin schemas get +{ + "version": 0, + "type": "String", + "timestamp": 0, + "data": "string", + "properties": { + "property1": "string", + "property2": "string" + } +} +``` + +You can also retrieve the Schema of a given version by specifying `--version` option. + +```shell +$ pulsar-admin schemas get --version +``` + +#### REST API + +Retrieve the latest version of the schema: + +{@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/getSchema} + +Retrieve the schema of a given version: + +{@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema/:version|operation/getSchema} + +### Delete Schema + +#### pulsar-admin + +You can delete a schema using the [`delete`](reference-pulsar-admin.md#delete-8) subcommand. + +```shell +$ pulsar-admin schemas delete +``` + +#### REST API + +{@inject: endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema} \ No newline at end of file diff --git a/site2/docs/concepts-schema-registry.md b/site2/docs/concepts-schema-registry.md new file mode 100644 index 0000000000000..50bb9bbed904c --- /dev/null +++ b/site2/docs/concepts-schema-registry.md @@ -0,0 +1,81 @@ +--- +id: concepts-schema-registry +title: Schema Registry +sidebar_label: Schema Registry +--- + +Type safety is extremely important in any application built around a message bus like Pulsar. Producers and consumers need some kind of mechanism for coordinating types at the topic level lest a wide variety of potential problems arise (for example serialization and deserialization issues). Applications typically adopt one of two basic approaches to type safety in messaging: + +1. A "client-side" approach in which message producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. If a producer is sending temperature sensor data on the topic `topic-1`, consumers of that topic will run into trouble if they attempt to parse that data as, say, moisture sensor readings. +1. A "server-side" approach in which producers and consumers inform the system which data types can be transmitted via the topic. With this approach, the messaging system enforces type safety and ensures that producers and consumers remain synced. + +Both approaches are available in Pulsar, and you're free to adopt one or the other or to mix and match on a per-topic basis. + +1. For the "client-side" approach, producers and consumers can send and receive messages consisting of raw byte arrays and leave all type safety enforcement to the application on an "out-of-band" basis. +1. For the "server-side" approach, Pulsar has a built-in **schema registry** that enables clients to upload data schemas on a per-topic basis. Those schemas dictate which data types are recognized as valid for that topic. + +> The Pulsar schema registry is currently available only for the [Java client](client-libraries-java.md). + +## Basic architecture + +Schemas are automatically uploaded when you create a typed Producer with a Schema. Additionally, Schemas can be manually uploaded to, fetched from, and updated via Pulsar's {@inject: rest:REST:tag/schemas} API. + +> #### Other schema registry backends +> Out of the box, Pulsar uses the [Apache BookKeeper](concepts-architecture-overview#persistent-storage) log storage system for schema storage. You can, however, use different backends if you wish. Documentation for custom schema storage logic is coming soon. + +## How schemas work + +Pulsar schemas are applied and enforced *at the topic level* (schemas cannot be applied at the namespace or tenant level). Producers and consumers upload schemas to Pulsar brokers. + +Pulsar schemas are fairly simple data structures that consist of: + +* A **name**. In Pulsar, a schema's name is the topic to which the schema is applied. +* A **payload**, which is a binary representation of the schema +* A schema [**type**](#supported-schema-formats) +* User-defined **properties** as a string/string map. Usage of properties is wholly application specific. Possible properties might be the Git hash associated with a schema, an environment like `dev` or `prod`, etc. + +## Schema versions + +In order to illustrate how schema versioning works, let's walk through an example. Imagine that the Pulsar [Java client](client-libraries-java.md) created using the code below attempts to connect to Pulsar and begin sending messages: + +```java +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + +Producer producer = client.newProducer(JSONSchema.of(SensorReading.class)) + .topic("sensor-data") + .sendTimeout(3, TimeUnit.SECONDS) + .create(); +``` + +The table below lists the possible scenarios when this connection attempt occurs and what will happen in light of each scenario: + +Scenario | What happens +:--------|:------------ +No schema exists for the topic | The producer is created using the given schema. The schema is transmitted to the broker and stored (since no existing schema is "compatible" with the `SensorReading` schema). Any consumer created using the same schema/topic can consume messages from the `sensor-data` topic. +A schema already exists; the producer connects using the same schema that's already stored | The schema is transmitted to the Pulsar broker. The broker determines that the schema is compatible. The broker attempts to store the schema in [BookKeeper](concepts-architecture-overview.md#persistent-storage) but then determines that it's already stored, so it's then used to tag produced messages. +A schema already exists; the producer connects using a new schema that is compatible | The producer transmits the schema to the broker. The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number). + +> Schemas are versioned in succession. Schema storage happens in the broker that handles the associated topic so that version assignments can be made. Once a version is assigned/fetched to/for a schema, all subsequent messages produced by that producer are tagged with the appropriate version. + + +## Supported schema formats + +The following formats are supported by the Pulsar schema registry: + +* None. If no schema is specified for a topic, producers and consumers will handle raw bytes. +* `String` (used for UTF-8-encoded strings) +* [JSON](https://www.json.org/) +* [Protobuf](https://developers.google.com/protocol-buffers/) +* [Avro](https://avro.apache.org/) + +For usage instructions, see the documentation for your preferred client library: + +* [Java](client-libraries-java.md#schemas) + +> Support for other schema formats will be added in future releases of Pulsar. + +## Managing Schemas + +You can use Pulsar's [admin tools](admin-api-schemas.md) for managing schemas for topics. \ No newline at end of file diff --git a/site2/docs/functions-api.md b/site2/docs/functions-api.md new file mode 100644 index 0000000000000..40efa2d397544 --- /dev/null +++ b/site2/docs/functions-api.md @@ -0,0 +1,720 @@ +--- +id: functions-api +title: The Pulsar Functions API +sidebar_label: API +--- + +[Pulsar Functions](functions-overview.md) provides an easy-to-use API that developers can use to create and manage processing logic for the Apache Pulsar messaging system. With Pulsar Functions, you can write functions of any level of complexity in [Java](#functions-for-java) or [Python](#functions-for-python) and run them in conjunction with a Pulsar cluster without needing to run a separate stream processing engine. + +> For a more in-depth overview of the Pulsar Functions feature, see the [Pulsar Functions overview](functions-overview.md). + +## Core programming model + +Pulsar Functions provide a wide range of functionality but are based on a very simple programming model. You can think of Pulsar Functions as lightweight processes that + +* consume messages from one or more Pulsar topics and then +* apply some user-defined processing logic to each incoming message. That processing logic could be just about anything you want, including + * producing the resulting, processed message on another Pulsar topic, or + * doing something else with the message, such as writing results to an external database. + +You could use Pulsar Functions, for example, to set up the following processing chain: + +* A [Python](#functions-for-python) function listens on the `raw-sentences` topic and "[sanitizes](#example-function)" incoming strings (removing extraneous whitespace and converting all characters to lower case) and then publishes the results to a `sanitized-sentences` topic +* A [Java](#functions-for-java) function listens on the `sanitized-sentences` topic, counts the number of times each word appears within a specified time window, and publishes the results to a `results` topic +* Finally, a Python function listens on the `results` topic and writes the results to a MySQL table + +### Example function + +Here's an example "input sanitizer" function written in Python and stored in a `sanitizer.py` file: + +```python +def clean_string(s): + return s.strip().lower() + +def process(input): + return clean_string(input) +``` + +Some things to note about this Pulsar Function: + +* There is no client, producer, or consumer object involved. All message "plumbing" is already taken care of for you, enabling you to worry only about processing logic. +* No topics, subscription types, tenants, or namespaces are specified in the function logic itself. Instead, topics are specified upon [deployment](#example-deployment). This means that you can use and re-use Pulsar Functions across topics, tenants, and namespaces without needing to hard-code those attributes. + +### Example deployment + +Deploying Pulsar Functions is handled by the [`pulsar-admin`](reference-pulsar-admin.md) CLI tool, in particular the [`functions`](reference-pulsar-admin.md#functions) command. Here's an example command that would run our [sanitizer](#example-function) function from above in [local run](functions-deploying.md#local-run-mode) mode: + +```bash +$ bin/pulsar-admin functions localrun \ + --py sanitizer.py \ # The Python file with the function's code + --classname sanitizer \ # The class or function holding the processing logic + --tenant public \ # The function's tenant (derived from the topic name by default) + --namespace default \ # The function's namespace (derived from the topic name by default) + --name sanitizer-function \ # The name of the function (the class name by default) + --inputs dirty-strings-in \ # The input topic(s) for the function + --output clean-strings-out \ # The output topic for the function + --log-topic sanitizer-logs # The topic to which all functions logs are published +``` + +For instructions on running functions in your Pulsar cluster, see the [Deploying Pulsar Functions](functions-deploying.md) guide. + +### Available APIs + +In both Java and Python, you have two options for writing Pulsar Functions: + +Interface | Description | Use cases +:---------|:------------|:--------- +Language-native interface | No Pulsar-specific libraries or special dependencies required (only core libraries from Java/Python) | Functions that don't require access to the function's [context](#context) +Pulsar Function SDK for Java/Python | Pulsar-specific libraries that provide a range of functionality not provided by "native" interfaces | Functions that require access to the function's [context](#context) + +In Python, for example, this language-native function, which adds an exclamation point to all incoming strings and publishes the resulting string to a topic, would have no external dependencies: + +```python +def process(input): + return "{}!".format(input) +``` + +This function, however, would use the Pulsar Functions [SDK for Python](#python-sdk-functions): + +```python +from pulsar import Function + +class DisplayFunctionName(Function): + def process(self, input, context): + function_name = context.function_name() + return "The function processing this message has the name {0}".format(function_name) +``` + +### Functions, Messages and Message Types + +Pulsar Functions can take byte arrays as inputs and spit out byte arrays as output. However in languages that support typed interfaces(just Java at the moment) one can write typed Functions as well. In this scenario, there are two ways one can bind messages to types. +* [Schema Registry](#Schema-Registry) +* [SerDe](#SerDe) + +### Schema Registry +Pulsar has a built in [Schema Registry](concepts-schema-registry) and comes bundled with a variety of popular schema types(avro, json and protobuf). Pulsar Functions can leverage existing schema information from input topics to derive the input type. The same applies for output topic as well. + +### SerDe + +SerDe stands for **Ser**ialization and **De**serialization. All Pulsar Functions use SerDe for message handling. How SerDe works by default depends on the language you're using for a particular function: + +* In [Python](#python-serde), the default SerDe is identity, meaning that the type is serialized as whatever type the producer function returns +* In [Java](#java-serde), a number of commonly used types (`String`s, `Integer`s, etc.) are supported by default + +In both languages, however, you can write your own custom SerDe logic for more complex, application-specific types. See the docs for [Java](#java-serde) and [Python](#python-serde) for language-specific instructions. + +### Context + +Both the [Java](#java-sdk-functions) and [Python](#python-sdk-functions) SDKs provide access to a **context object** that can be used by the function. This context object provides a wide variety of information and functionality to the function: + +* The name and ID of the Pulsar Function +* The message ID of each message. Each Pulsar message is automatically assigned an ID. +* The name of the topic on which the message was sent +* The names of all input topics as well as the output topic associated with the function +* The name of the class used for [SerDe](#serialization-and-deserialization-serde) +* The [tenant](reference-terminology.md#tenant) and namespace associated with the function +* The ID of the Pulsar Functions instance running the function +* The version of the function +* The [logger object](functions-overview.md#logging) used by the function, which can be used to create function log messages +* Access to arbitrary [user config](#user-config) values supplied via the CLI +* An interface for recording [metrics](functions-metrics.md) +* An interface for storing and retrieving state in [state storage](functions-overview.md#state-storage) + +### User config + +When you run or update Pulsar Functions created using the [SDK](#available-apis), you can pass arbitrary key/values to them via the command line with the `--userConfig` flag. Key/values must be specified as JSON. Here's an example of a function creation command that passes a user config key/value to a function: + +```bash +$ bin/pulsar-admin functions create \ + --name word-filter \ + # Other function configs + --user-config '{"forbidden-word":"rosebud"}' +``` + +If the function were a Python function, that config value could be accessed like this: + +```python +from pulsar import Function + +class WordFilter(Function): + def process(self, context, input): + forbidden_word = context.user_config()["forbidden-word"] + + # Don't publish the message if it contains the user-supplied + # forbidden word + if forbidden_word in input: + pass + # Otherwise publish the message + else: + return input +``` + +## Functions for Java + +Writing Pulsar Functions in Java involves implementing one of two interfaces: + +* The [`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html) interface +* The {@inject: javadoc:Function:/pulsar-functions/org/apache/pulsar/functions/api/Function} interface. This interface works much like the `java.util.Function` interface, but with the important difference that it provides a {@inject: javadoc:Context:/pulsar-functions/org/apache/pulsar/functions/api/Context} object that you can use in a [variety of ways](#context) + +### Get started + +In order to write Pulsar Functions in Java, you'll need to install the proper [dependencies](#dependencies) and package your function [as a JAR](#packaging). + +#### Dependencies + +How you get started writing Pulsar Functions in Java depends on which API you're using: + +* If you're writing a [Java native function](#java-native-functions), you won't need any external dependencies. +* If you're writing a [Java SDK function](#java-sdk-functions), you'll need to import the `pulsar-functions-api` library. + + Here's an example for a Maven `pom.xml` configuration file: + + ```xml + + org.apache.pulsar + pulsar-functions-api + 2.1.1-incubating + + ``` + + Here's an example for a Gradle `build.gradle` configuration file: + + ```groovy + dependencies { + compile group: 'org.apache.pulsar', name: 'pulsar-functions-api', version: '2.1.1-incubating' + } + ``` + +#### Packaging + +Whether you're writing Java Pulsar Functions using the [native](#java-native-functions) Java `java.util.Function` interface or using the [Java SDK](#java-sdk-functions), you'll need to package your function(s) as a "fat" JAR. + +> #### Starter repo +> If you'd like to get up and running quickly, you can use [this repo](https://github.com/streamlio/pulsar-functions-java-starter), which contains the necessary Maven configuration to build a fat JAR as well as some example functions. + +### Java native functions + +If your function doesn't require access to its [context](#context), you can create a Pulsar Function by implementing the [`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html) interface, which has this very simple, single-method signature: + +```java +public interface Function { + O apply(I input); +} +``` + +Here's an example function that takes a string as its input, adds an exclamation point to the end of the string, and then publishes the resulting string: + +```java +import java.util.Function; + +public class ExclamationFunction implements Function { + @Override + public String process(String input) { + return String.format("%s!", input); + } +} +``` + +In general, you should use native functions when you don't need access to the function's [context](#context). If you *do* need access to the function's context, then we recommend using the [Pulsar Functions Java SDK](#java-sdk-functions). + +#### Java native examples + +There is one example Java native function in this {@inject: github:folder:/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples}: + +* {@inject: github:`JavaNativeExclamationFunction`:/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclamationFunction.java} + +### Java SDK functions + +To get started developing Pulsar Functions using the Java SDK, you'll need to add a dependency on the `pulsar-functions-api` artifact to your project. Instructions can be found [above](#dependencies). + +> An easy way to get up and running with Pulsar Functions in Java is to clone the [`pulsar-functions-java-starter`](https://github.com/streamlio/pulsar-functions-java-starter) repo and follow the instructions there. + + +#### Java SDK examples + +There are several example Java SDK functions in this {@inject: github:folder:/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples}: + +Function name | Description +:-------------|:----------- +[`ContextFunction`](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextFunction.java) | Illustrates [context](#context)-specific functionality like [logging](#java-logging) and [metrics](#java-metrics) +[`WordCountFunction`](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java) | Illustrates usage of Pulsar Function [state-storage](functions-overview.md#state-storage) +[`ExclamationFunction`](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java) | A basic string manipulation function for the Java SDK +[`LoggingFunction`](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java) | A function that shows how [logging](#java-logging) works for Java +[`PublishFunction`](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java) | Publishes results to a topic specified in the function's [user config](#java-user-config) (rather than on the function's output topic) +[`UserConfigFunction`](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java) | A function that consumes [user-supplied configuration](#java-user-config) values +[`UserMetricFunction`](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java) | A function that records metrics +[`VoidFunction`](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java) | A simple [void function](#void-functions) + +### Java context object + +The {@inject: javadoc:Context:/client/org/apache/pulsar/functions/api/Context} interface provides a number of methods that you can use to access the function's [context](#context). The various method signatures for the `Context` interface are listed below: + +```java +public interface Context { + Record getCurrentRecord(); + Collection getInputTopics(); + String getOutputTopic(); + String getOutputSchemaType(); + String getTenant(); + String getNamespace(); + String getFunctionName(); + String getFunctionId(); + String getInstanceId(); + String getFunctionVersion(); + Logger getLogger(); + void incrCounter(String key, long amount); + long getCounter(String key); + void putState(String key, ByteBuffer value); + ByteBuffer getState(String key); + Map getUserConfigMap(); + Optional getUserConfigValue(String key); + Object getUserConfigValueOrDefault(String key, Object defaultValue); + void recordMetric(String metricName, double value); + CompletableFuture publish(String topicName, O object, String schemaOrSerdeClassName); + CompletableFuture publish(String topicName, O object); +} +``` + +Here's an example function that uses several methods available via the `Context` object: + +```java +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; +import org.slf4j.Logger; + +import java.util.stream.Collectors; + +public class ContextFunction implements Function { + public Void process(String input, Context context) { + Logger LOG = context.getLogger(); + String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", ")); + String functionName = context.getFunctionName(); + + String logMessage = String.format("A message with a value of \"%s\" has arrived on one of the following topics: %s\n", + input, + inputTopics); + + LOG.info(logMessage); + + String metricName = String.format("function-%s-messages-received", functionName); + context.recordMetric(metricName, 1); + + return null; + } +} +``` + +### Void functions + +Pulsar Functions can publish results to an output topic, but this isn't required. You can also have functions that simply produce a log, write results to a database, etc. Here's a function that writes a simple log every time a message is received: + +```java +import org.slf4j.Logger; + +public class LogFunction implements PulsarFunction { + public String apply(String input, Context context) { + Logger LOG = context.getLogger(); + LOG.info("The following message was received: {}", input); + return null; + } +} +``` + +> When using Java functions in which the output type is `Void`, the function must *always* return `null`. + +### Java SerDe + +Pulsar Functions use [SerDe](#serialization-and-deserialization-serde) when publishing data to and consuming data from Pulsar topics. When you're writing Pulsar Functions in Java, the following basic Java types are built in and supported by default: + +* `String` +* `Double` +* `Integer` +* `Float` +* `Long` +* `Short` +* `Byte` + +Built-in vs. custom. For custom, you need to implement this interface: + +```java +public interface SerDe { + T deserialize(byte[] input); + byte[] serialize(T input); +} +``` + +#### Java SerDe example + +Imagine that you're writing Pulsar Functions in Java that are processing tweet objects. Here's a simple example `Tweet` class: + +```java +public class Tweet { + private String username; + private String tweetContent; + + public Tweet(String username, String tweetContent) { + this.username = username; + this.tweetContent = tweetContent; + } + + // Standard setters and getters +} +``` + +In order to be able to pass `Tweet` objects directly between Pulsar Functions, you'll need to provide a custom SerDe class. In the example below, `Tweet` objects are basically strings in which the username and tweet content are separated by a `|`. + +```java +package com.example.serde; + +import org.apache.pulsar.functions.api.SerDe; + +import java.util.regex.Pattern; + +public class TweetSerde implements SerDe { + public Tweet deserialize(byte[] input) { + String s = new String(input); + String[] fields = s.split(Pattern.quote("|")); + return new Tweet(fields[0], fields[1]); + } + + public byte[] serialize(Tweet input) { + return "%s|%s".format(input.getUsername(), input.getTweetContent()).getBytes(); + } +} +``` + +To apply this custom SerDe to a particular Pulsar Function, you would need to: + +* Package the `Tweet` and `TweetSerde` classes into a JAR +* Specify a path to the JAR and SerDe class name when deploying the function + +Here's an example [`create`](reference-pulsar-admin.md#create-1) operation: + +```bash +$ bin/pulsar-admin functions create \ + --jar /path/to/your.jar \ + --output-serde-classname com.example.serde.TweetSerde \ + # Other function attributes +``` + +> #### Custom SerDe classes must be packaged with your function JARs +> Pulsar does not store your custom SerDe classes separately from your Pulsar Functions. That means that you'll need to always include your SerDe classes in your function JARs. If not, Pulsar will return an error. + +### Java logging + +Pulsar Functions that use the [Java SDK](#java-sdk-functions) have access to an [SLF4j](https://www.slf4j.org/) [`Logger`](https://www.slf4j.org/api/org/apache/log4j/Logger.html) object that can be used to produce logs at the chosen log level. Here's a simple example function that logs either a `WARNING`- or `INFO`-level log based on whether the incoming string contains the word `danger`: + +```java +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; +import org.slf4j.Logger; + +public class LoggingFunction implements Function { + @Override + public void apply(String input, Context context) { + Logger LOG = context.getLogger(); + String messageId = new String(context.getMessageId()); + + if (input.contains("danger")) { + LOG.warn("A warning was received in message {}", messageId); + } else { + LOG.info("Message {} received\nContent: {}", messageId, input); + } + + return null; + } +} +``` + +If you want your function to produce logs, you need to specify a log topic when creating or running the function. Here's an example: + +```bash +$ bin/pulsar-admin functions create \ + --jar my-functions.jar \ + --classname my.package.LoggingFunction \ + --log-topic persistent://public/default/logging-function-logs \ + # Other function configs +``` + +Now, all logs produced by the `LoggingFunction` above can be accessed via the `persistent://public/default/logging-function-logs` topic. + +### Java user config + +The Java SDK's [`Context`](#context) object enables you to access key/value pairs provided to the Pulsar Function via the command line (as JSON). Here's an example function creation command that passes a key/value pair: + +```bash +$ bin/pulsar-admin functions create \ + # Other function configs + --user-config '{"word-of-the-day":"verdure"}' +``` + +To access that value in a Java function: + +```java +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; +import org.slf4j.Logger; + +import java.util.Optional; + +public class UserConfigFunction implements Function { + @Override + public void apply(String input, Context context) { + Logger LOG = context.getLogger(); + Optional wotd = context.getUserConfigValue("word-of-the-day"); + if (wotd.isPresent()) { + LOG.info("The word of the day is {}", wotd); + } else { + LOG.warn("No word of the day provided"); + } + return null; + } +} +``` + +The `UserConfigFunction` function will log the string `"The word of the day is verdure"` every time the function is invoked (i.e. every time a message arrives). The `word-of-the-day` user config will be changed only when the function is updated with a new config value via the command line. + +You can also access the entire user config map or set a default value in case no value is present: + +```java +// Get the whole config map +Map allConfigs = context.getUserConfigMap(); + +// Get value or resort to default +String wotd = context.getUserConfigValueOrDefault("word-of-the-day", "perspicacious"); +``` + +> For all key/value pairs passed to Java Pulsar Functions, both the key *and* the value are `String`s. If you'd like the value to be of a different type, you will need to deserialize from the `String` type. + +### Java metrics + +You can record metrics using the [`Context`](#context) object on a per-key basis. You can, for example, set a metric for the key `process-count` and a different metric for the key `elevens-count` every time the function processes a message. Here's an example: + +```java +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + +public class MetricRecorderFunction implements Function { + @Override + public void apply(Integer input, Context context) { + // Records the metric 1 every time a message arrives + context.recordMetric("hit-count", 1); + + // Records the metric only if the arriving number equals 11 + if (input == 11) { + context.recordMetric("elevens-count", 1); + } + + return null; + } +} +``` + +> For instructions on reading and using metrics, see the [Monitoring](deploy-monitoring.md) guide. + + +## Functions for Python + +Writing Pulsar Functions in Python entails implementing one of two things: + +* A `process` function that takes an input (message data from the function's input topic(s)), applies some kind of logic to it, and either returns an object (to be published to the function's output topic) or `pass`es and thus doesn't produce a message +* A `Function` class that has a `process` method that provides a message input to process and a [context](#context) object + +### Get started + +Regardless of which [deployment mode](functions-deploying.md) you're using, 'pulsar-client' python library has to installed on any machine that's running Pulsar Functions written in Python. + +That could be your local machine for [local run mode](functions-deploying.md#local-run-mode) or a machine running a Pulsar [broker](reference-terminology.md#broker) for [cluster mode](functions-deploying.md#cluster-mode). To install those libraries using pip: + +```bash +$ pip install pulsar-client +``` + +### Packaging + +At the moment, the code for Pulsar Functions written in Python must be contained within a single Python file. In the future, Pulsar Functions may support other packaging formats, such as [**P**ython **EX**ecutables](https://github.com/pantsbuild/pex) (PEXes). + +### Python native functions + +If your function doesn't require access to its [context](#context), you can create a Pulsar Function by implementing a `process` function, which provides a single input object that you can process however you wish. Here's an example function that takes a string as its input, adds an exclamation point at the end of the string, and then publishes the resulting string: + +```python +def process(input): + return "{0}!".format(input) +``` + +In general, you should use native functions when you don't need access to the function's [context](#context). If you *do* need access to the function's context, then we recommend using the [Pulsar Functions Python SDK](#python-sdk-functions). + +#### Python native examples + +There is one example Python native function in this {@inject: github:folder:/pulsar-functions/python-examples}: + +* {@inject: github:`native_exclamation_function.py`:/pulsar-functions/python-examples/native_exclamation_function.py} + +### Python SDK functions + +To get started developing Pulsar Functions using the Python SDK, you'll need to install the [`pulsar-client`](/api/python) library using the instructions [above](#getting-started). + +#### Python SDK examples + +There are several example Python functions in this {@inject: github:folder:/pulsar-functions/python-examples}: + +Function file | Description +:-------------|:----------- +[`exclamation_function.py`](https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/exclamation_function.py) | Adds an exclamation point at the end of each incoming string +[`logging_function.py`](https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/logging_function.py) | Logs each incoming message +[`thumbnailer.py`](https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/thumbnailer.py) | Takes image data as input and outputs a 128x128 thumbnail of each image + +#### Python context object + +The [`Context`](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/context.py) class provides a number of methods that you can use to access the function's [context](#context). The various methods for the `Context` class are listed below: + +Method | What it provides +:------|:---------------- +`get_message_id` | The message ID of the message being processed +`get_current_message_topic_name` | The topic of the message being currently being processed +`get_function_tenant` | The tenant under which the current Pulsar Function runs under +`get_function_namespace` | The namespace under which the current Pulsar Function runs under +`get_function_name` | The name of the current Pulsar Function +`get_function_id` | The ID of the current Pulsar Function +`get_instance_id` | The ID of the current Pulsar Functions instance +`get_function_version` | The version of the current Pulsar Function +`get_logger` | A logger object that can be used for [logging](#python-logging) +`get_user_config_value` | Returns the value of a [user-defined config](#python-user-config) (or `None` if the config doesn't exist) +`get_user_config_map` | Returns the entire user-defined config as a dict +`record_metric` | Records a per-key [metric](#python-metrics) +`publish` | Publishes a message to the specified Pulsar topic +`get_output_serde_class_name` | The name of the output [SerDe](#python-serde) class +`ack` | [Acks](reference-terminology.md#acknowledgment-ack) the message being processed to Pulsar + +### Python SerDe + +Pulsar Functions use [SerDe](#serialization-and-deserialization-serde) when publishing data to and consuming data from Pulsar topics (this is true of both [native](#python-native-functions) functions and [SDK](#python-sdk-functions) functions). You can specify the SerDe when [creating](functions-deploying.md#cluster-mode) or [running](functions-deploying.md#local-run-mode) functions. Here's an example: + +```bash +$ bin/pulsar-admin functions create \ + --tenant public \ + --namespace default \ + --name my_function \ + --py my_function.py \ + --classname my_function.MyFunction \ + --custom-serde-inputs '{"input-topic-1":"Serde1","input-topic-2":"Serde2"}' \ + --output-serde-classname Serde3 \ + --output output-topic-1 +``` + +In this case, there are two input topics, `input-topic-1` and `input-topic-2`, each of which is mapped to a different SerDe class (the map must be specified as a JSON string). The output topic, `output-topic-1`, uses the `Serde3` class for SerDe. At the moment, all Pulsar Function logic, include processing function and SerDe classes, must be contained within a single Python file. + +When using Pulsar Functions for Python, you essentially have three SerDe options: + +1. You can use the [`IdentitySerde`](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L70), which leaves the data unchanged. The `IdentitySerDe` is the **default**. Creating or running a function without explicitly specifying SerDe will mean that this option is used. +2. You can use the [`PickeSerDe`](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L62), which uses Python's [`pickle`](https://docs.python.org/3/library/pickle.html) for SerDe. +3. You can create a custom SerDe class by implementing the baseline [`SerDe`](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L50) class, which has just two methods: [`serialize`](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L53) for converting the object into bytes, and [`deserialize`](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L58) for converting bytes into an object of the required application-specific type. + +The table below shows when you should use each SerDe: + +SerDe option | When to use +:------------|:----------- +`IdentitySerde` | When you're working with simple types like strings, Booleans, integers, and the like +`PickleSerDe` | When you're working with complex, application-specific types and are comfortable with `pickle`'s "best effort" approach +Custom SerDe | When you require explicit control over SerDe, potentially for performance or data compatibility purposes + +#### Python SerDe example + +Imagine that you're writing Pulsar Functions in Python that are processing tweet objects. Here's a simple `Tweet` class: + +```python +class Tweet(object): + def __init__(self, username, tweet_content): + self.username = username + self.tweet_content = tweet_content +``` + +In order to use this class in Pulsar Functions, you'd have two options: + +1. You could specify `PickleSerDe`, which would apply the [`pickle`](https://docs.python.org/3/library/pickle.html) library's SerDe +1. You could create your own SerDe class. Here's a simple example: + + ```python + from pulsar import SerDe + + class TweetSerDe(SerDe): + def __init__(self, tweet): + self.tweet = tweet + + def serialize(self, input): + return bytes("{0}|{1}".format(self.tweet.username, self.tweet.tweet_content)) + + def deserialize(self, input_bytes): + tweet_components = str(input_bytes).split('|') + return Tweet(tweet_components[0], tweet_componentsp[1]) + ``` + +### Python logging + +Pulsar Functions that use the [Python SDK](#python-sdk-functions) have access to a logging object that can be used to produce logs at the chosen log level. Here's a simple example function that logs either a `WARNING`- or `INFO`-level log based on whether the incoming string contains the word `danger`: + +```python +from pulsar import Function + +class LoggingFunction(Function): + def process(self, input, context): + logger = context.get_logger() + msg_id = context.get_message_id() + if 'danger' in input: + logger.warn("A warning was received in message {0}".format(context.get_message_id())) + else: + logger.info("Message {0} received\nContent: {1}".format(msg_id, input)) +``` + +If you want your function to produce logs on a Pulsar topic, you need to specify a **log topic** when creating or running the function. Here's an example: + +```bash +$ bin/pulsar-admin functions create \ + --py logging_function.py \ + --classname logging_function.LoggingFunction \ + --log-topic logging-function-logs \ + # Other function configs +``` + +Now, all logs produced by the `LoggingFunction` above can be accessed via the `logging-function-logs` topic. + +### Python user config + +The Python SDK's [`Context`](#context) object enables you to access key/value pairs provided to the Pulsar Function via the command line (as JSON). Here's an example function creation command that passes a key/value pair: + +```bash +$ bin/pulsar-admin functions create \ + # Other function configs \ + --user-config '{"word-of-the-day":"verdure"}' +``` + +To access that value in a Python function: + +```python +from pulsar import Function + +class UserConfigFunction(Function): + def process(self, input, context): + logger = context.get_logger() + wotd = context.get_user_config_value('word-of-the-day') + if wotd is None: + logger.warn('No word of the day provided') + else: + logger.info("The word of the day is {0}".format(wotd)) +``` + +### Python metrics + +You can record metrics using the [`Context`](#context) object on a per-key basis. You can, for example, set a metric for the key `process-count` and a different metric for the key `elevens-count` every time the function processes a message. Here's an example: + +```python +from pulsar import Function + +class MetricRecorderFunction(Function): + def process(self, input, context): + context.record_metric('hit-count', 1) + + if input == 11: + context.record_metric('elevens-count', 1) +``` \ No newline at end of file diff --git a/site2/docs/functions-debugging.md b/site2/docs/functions-debugging.md new file mode 100644 index 0000000000000..94c3978acdbb1 --- /dev/null +++ b/site2/docs/functions-debugging.md @@ -0,0 +1,126 @@ +--- +id: functions-debugging +title: How to debug Pulsar Functions +sidebar_label: Debugging +--- + + +## Use unit test + +A Pulsar Function at its core is just a function with inputs and outputs, thus testing a Pulsar Function can be done in a similar way as testing any function. + +For example, if a user has the following Pulsar Function: + +```java +import java.util.function.Function; + +public class JavaNativeExclamationFunction implements Function { + @Override + public String apply(String input) { + return String.format("%s!", input); + } +} +``` + +The user can write a simple unit test to test this Pulsar function: + +```java +@Test +public void testJavaNativeExclamationFunction() { + JavaNativeExclamationFunction exclamation = new JavaNativeExclamationFunction(); + String output = exclamation.apply("foo"); + Assert.assertEquals(output, "foo!"); +} +``` + +Consequently, if a user has a Pulsar Function that implements the ```org.apache.pulsar.functions.api.Function``` interface: + +```java +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + +public class ExclamationFunction implements Function { + @Override + public String process(String input, Context context) { + return String.format("%s!", input); + } +} +``` + +The user can write a unit test for this function as well. Remember to mock out the ```Context``` parameter. + +For example: + +```java +@Test +public void testExclamationFunction() { + ExclamationFunction exclamation = new ExclamationFunction(); + String output = exclamation.process("foo", mock(Context.class)); + Assert.assertEquals(output, "foo!"); +} +``` + +## Debug with localrun mode + +> Note +> +> Currently, debugging with localrun mode only supports Pulsar Functions written in Java. Users need Pulsar version 2.4.0 or later to do the following. Even though localrun is available in versions earlier than Pulsar 2.4.0, it does not have the functionality to be executed programmatically and run Functions as threads. + +To test in a more realistic fashion, a Pulsar Function can be run via localrun mode which will launch an instance of the Function on your local machine as a thread. + +In this mode, the Pulsar Function can consume and produce actual data to a Pulsar cluster and mirrors how the function will actually run in a Pulsar cluster. + +Users can launch his or her function in the following manner: + +```java +FunctionConfig functionConfig = new FunctionConfig(); +functionConfig.setName(functionName); +functionConfig.setInputs(Collections.singleton(sourceTopic)); +functionConfig.setClassName(ExclamationFunction.class.getName()); +functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); +functionConfig.setOutput(sinkTopic); + +LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).build(); +localRunner.start(true); +``` + +This allows users to easily debug functions using an IDE. Users can set breakpoints and manually step through a function to debug with real data. + +The following code snippet is a more complete example on how to programmatically launch a function in localrun mode. + +```java +public class ExclamationFunction implements Function { + + @Override + public String process(String s, Context context) throws Exception { + return s + "!"; + } + + +public static void main(String[] args) throws Exception { + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setName("exclamation"); + functionConfig.setInputs(Collections.singleton("input")); + functionConfig.setClassName(ExclamationFunction.class.getName()); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setOutput("output"); + + LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).build(); + localRunner.start(false); +} +``` + +To use localrun like above programmatically please addd the following dependency: + +```xml + + org.apache.pulsar + pulsar-functions-local-runner + ${pulsar.version} + + +``` + +For complete code samples, see [here](https://github.com/jerrypeng/pulsar-functions-demos/tree/master/debugging) + +In the future, debugging with localrun mode for Pulsar Functions written in other languages will be supported. \ No newline at end of file diff --git a/site2/docs/functions-deploying.md b/site2/docs/functions-deploying.md new file mode 100644 index 0000000000000..3c5bf9df738a3 --- /dev/null +++ b/site2/docs/functions-deploying.md @@ -0,0 +1,228 @@ +--- +id: functions-deploying +title: Deploying and managing Pulsar Functions +sidebar_label: Deploying functions +--- + +At the moment, there are two deployment modes available for Pulsar Functions: + +Mode | Description +:----|:----------- +Local run mode | The function runs in your local environment, for example on your laptop +Cluster mode | The function runs *inside of* your Pulsar cluster, on the same machines as your Pulsar brokers + +> #### Contributing new deployment modes +> The Pulsar Functions feature was designed, however, with extensibility in mind. Other deployment options will be available in the future. If you'd like to add a new deployment option, we recommend getting in touch with the Pulsar developer community at [dev@pulsar.apache.org](mailto:dev@pulsar.apache.org). + +## Requirements + +In order to deploy and manage Pulsar Functions, you need to have a Pulsar cluster running. There are several options for this: + +* You can run a [standalone cluster](getting-started-standalone.md) locally on your own machine +* You can deploy a Pulsar cluster on [Kubernetes](deploy-kubernetes.md), [Amazon Web Services](deploy-aws.md), [bare metal](deploy-bare-metal.md), [DC/OS](deploy-dcos.md), and more + +If you're running a non-[standalone](reference-terminology.md#standalone) cluster, you'll need to obtain the service URL for the cluster. How you obtain the service URL will depend on how you deployed your Pulsar cluster. + +If you're going to deploy and trigger python user-defined functions, you should install [the pulsar python client](http://pulsar.apache.org/docs/en/client-libraries-python/) first. + +## Command-line interface + +Pulsar Functions are deployed and managed using the [`pulsar-admin functions`](reference-pulsar-admin.md#functions) interface, which contains commands such as [`create`](reference-pulsar-admin.md#functions-create) for deploying functions in [cluster mode](#cluster-mode), [`trigger`](reference-pulsar-admin.md#trigger) for [triggering](#triggering-pulsar-functions) functions, [`list`](reference-pulsar-admin.md#list-2) for listing deployed functions, and several others. + +### Fully Qualified Function Name (FQFN) + +Each Pulsar Function has a **Fully Qualified Function Name** (FQFN) that consists of three elements: the function's tenant, namespace, and function name. FQFN's look like this: + +```http +tenant/namespace/name +``` + +FQFNs enable you to, for example, create multiple functions with the same name provided that they're in different namespaces. + +### Default arguments + +When managing Pulsar Functions, you'll need to specify a variety of information about those functions, including tenant, namespace, input and output topics, etc. There are some parameters, however, that have default values that will be supplied if omitted. The table below lists the defaults: + +Parameter | Default +:---------|:------- +Function name | Whichever value is specified for the class name (minus org, library, etc.). The flag `--classname org.example.MyFunction`, for example, would give the function a name of `MyFunction`. +Tenant | Derived from the input topics' names. If the input topics are under the `marketing` tenant---i.e. the topic names have the form `persistent://marketing/{namespace}/{topicName}`---then the tenant will be `marketing`. +Namespace | Derived from the input topics' names. If the input topics are under the `asia` namespace under the `marketing` tenant---i.e. the topic names have the form `persistent://marketing/asia/{topicName}`, then the namespace will be `asia`. +Output topic | `{input topic}-{function name}-output`. A function with an input topic name of `incoming` and a function name of `exclamation`, for example, would have an output topic of `incoming-exclamation-output`. +Subscription type | For at-least-once and at-most-once [processing guarantees](functions-guarantees.md), the [`SHARED`](concepts-messaging.md#shared) is applied by default; for effectively-once guarantees, [`FAILOVER`](concepts-messaging.md#failover) is applied +Processing guarantees | [`ATLEAST_ONCE`](functions-guarantees.md) +Pulsar service URL | `pulsar://localhost:6650` + +#### Example use of defaults + +Take this `create` command: + +```bash +$ bin/pulsar-admin functions create \ + --jar my-pulsar-functions.jar \ + --classname org.example.MyFunction \ + --inputs my-function-input-topic1,my-function-input-topic2 +``` + +The created function would have default values supplied for the function name (`MyFunction`), tenant (`public`), namespace (`default`), subscription type (`SHARED`), processing guarantees (`ATLEAST_ONCE`), and Pulsar service URL (`pulsar://localhost:6650`). + +## Local run mode + +If you run a Pulsar Function in **local run** mode, it will run on the machine from which the command is run (this could be your laptop, an [AWS EC2](https://aws.amazon.com/ec2/) instance, etc.). Here's an example [`localrun`](reference-pulsar-admin.md#localrun) command: + +```bash +$ bin/pulsar-admin functions localrun \ + --py myfunc.py \ + --classname myfunc.SomeFunction \ + --inputs persistent://public/default/input-1 \ + --output persistent://public/default/output-1 +``` + +By default, the function will connect to a Pulsar cluster running on the same machine, via a local [broker](reference-terminology.md#broker) service URL of `pulsar://localhost:6650`. If you'd like to use local run mode to run a function but connect it to a non-local Pulsar cluster, you can specify a different broker URL using the `--brokerServiceUrl` flag. Here's an example: + +```bash +$ bin/pulsar-admin functions localrun \ + --broker-service-url pulsar://my-cluster-host:6650 \ + # Other function parameters +``` + +## Cluster mode + +When you run a Pulsar Function in **cluster mode**, the function code will be uploaded to a Pulsar broker and run *alongside the broker* rather than in your [local environment](#local-run-mode). You can run a function in cluster mode using the [`create`](reference-pulsar-admin.md#create-1) command. Here's an example: + +```bash +$ bin/pulsar-admin functions create \ + --py myfunc.py \ + --classname myfunc.SomeFunction \ + --inputs persistent://public/default/input-1 \ + --output persistent://public/default/output-1 +``` + +### Updating cluster mode functions + +You can use the [`update`](reference-pulsar-admin.md#update-1) command to update a Pulsar Function running in cluster mode. This command, for example, would update the function created in the section [above](#cluster-mode): + +```bash +$ bin/pulsar-admin functions update \ + --py myfunc.py \ + --classname myfunc.SomeFunction \ + --inputs persistent://public/default/new-input-topic \ + --output persistent://public/default/new-output-topic +``` + +### Parallelism + +Pulsar Functions run as processes called **instances**. When you run a Pulsar Function, it runs as a single instance by default (and in [local run mode](#local-run-mode) you can *only* run a single instance of a function). + +You can also specify the *parallelism* of a function, i.e. the number of instances to run, when you create the function. You can set the parallelism factor using the `--parallelism` flag of the [`create`](reference-pulsar-admin.md#functions-create) command. Here's an example: + +```bash +$ bin/pulsar-admin functions create \ + --parallelism 3 \ + # Other function info +``` + +You can adjust the parallelism of an already created function using the [`update`](reference-pulsar-admin.md#update-1) interface. + +```bash +$ bin/pulsar-admin functions update \ + --parallelism 5 \ + # Other function +``` + +If you're specifying a function's configuration via YAML, use the `parallelism` parameter. Here's an example config file: + +```yaml +# function-config.yaml +parallelism: 3 +inputs: +- persistent://public/default/input-1 +output: persistent://public/default/output-1 +# other parameters +``` + +And here's the corresponding update command: + +```bash +$ bin/pulsar-admin functions update \ + --function-config-file function-config.yaml +``` + +### Function instance resources + +When you run Pulsar Functions in [cluster run](#cluster-mode) mode, you can specify the resources that are assigned to each function [instance](#parallelism): + +Resource | Specified as... | Runtimes +:--------|:----------------|:-------- +CPU | The number of cores | Docker (coming soon) +RAM | The number of bytes | Process, Docker +Disk space | The number of bytes | Docker + +Here's an example function creation command that allocates 8 cores, 8 GB of RAM, and 10 GB of disk space to a function: + +```bash +$ bin/pulsar-admin functions create \ + --jar target/my-functions.jar \ + --classname org.example.functions.MyFunction \ + --cpu 8 \ + --ram 8589934592 \ + --disk 10737418240 +``` + +> #### Resources are *per instance* +> The resources that you apply to a given Pulsar Function are applied to each [instance](#parallelism) of the function. If you apply 8 GB of RAM to a function with a parallelism of 5, for example, then you are applying 40 GB of RAM total for the function. You should always make sure to factor parallelism---i.e. the number of instances---into your resource calculations + +## Triggering Pulsar Functions + +If a Pulsar Function is running in [cluster mode](#cluster-mode), you can **trigger** it at any time using the command line. Triggering a function means that you send a message with a specific value to the function and get the function's output (if any) via the command line. + +> Triggering a function is ultimately no different from invoking a function by producing a message on one of the function's input topics. The [`pulsar-admin functions trigger`](reference-pulsar-admin.md#trigger) command is essentially a convenient mechanism for sending messages to functions without needing to use the [`pulsar-client`](reference-cli-tools.md#pulsar-client) tool or a language-specific client library. + +To show an example of function triggering, let's start with a simple [Python function](functions-api.md#functions-for-python) that returns a simple string based on the input: + +```python +# myfunc.py +def process(input): + return "This function has been triggered with a value of {0}".format(input) +``` + +Let's run that function in [local run mode](functions-deploying.md#local-run-mode): + +```bash +$ bin/pulsar-admin functions create \ + --tenant public \ + --namespace default \ + --name myfunc \ + --py myfunc.py \ + --classname myfunc \ + --inputs persistent://public/default/in \ + --output persistent://public/default/out +``` + +Now let's make a consumer listen on the output topic for messages coming from the `myfunc` function using the [`pulsar-client consume`](reference-cli-tools.md#consume) command: + +```bash +$ bin/pulsar-client consume persistent://public/default/out \ + --subscription-name my-subscription + --num-messages 0 # Listen indefinitely +``` + +Now let's trigger that function: + +```bash +$ bin/pulsar-admin functions trigger \ + --tenant public \ + --namespace default \ + --name myfunc \ + --trigger-value "hello world" +``` + +The consumer listening on the output topic should then produce this in its logs: + +``` +----- got message ----- +This function has been triggered with a value of hello world +``` + +> #### Topic info not required +> In the `trigger` command above, you may have noticed that you only need to specify basic information about the function (tenant, namespace, and name). To trigger the function, you didn't need to know the function's input topic(s). \ No newline at end of file diff --git a/site2/docs/functions-guarantees.md b/site2/docs/functions-guarantees.md new file mode 100644 index 0000000000000..1b19ca2943c88 --- /dev/null +++ b/site2/docs/functions-guarantees.md @@ -0,0 +1,41 @@ +--- +id: functions-guarantees +title: Processing guarantees +sidebar_label: Processing guarantees +--- + +Pulsar Functions provides three different messaging semantics that you can apply to any function: + +Delivery semantics | Description +:------------------|:------- +**At-most-once** delivery | Each message that is sent to the function will most likely be processed but also may not be (hence the "at most") +**At-least-once** delivery | Each message that is sent to the function could be processed more than once (hence the "at least") +**Effectively-once** delivery | Each message that is sent to the function will have one output associated with it + +## Applying processing guarantees to a function + +You can set the processing guarantees for a Pulsar Function when you create the Function. This [`pulsar-function create`](reference-pulsar-admin.md#create-1) command, for example, would apply effectively-once guarantees to the Function: + +```bash +$ bin/pulsar-admin functions create \ + --processing-guarantees EFFECTIVELY_ONCE \ + # Other function configs +``` + +The available options are: + +* `ATMOST_ONCE` +* `ATLEAST_ONCE` +* `EFFECTIVELY_ONCE` + +> By default, Pulsar Functions provide at-most-once delivery guarantees. So if you create a function without supplying a value for the `--processingGuarantees` flag, then the function will provide at-most-once guarantees. + +## Updating the processing guarantees of a function + +You can change the processing guarantees applied to a function once it's already been created using the [`update`](reference-pulsar-admin.md#update-1) command. Here's an example: + +```bash +$ bin/pulsar-admin functions update \ + --processing-guarantees ATMOST_ONCE \ + # Other function configs +``` \ No newline at end of file diff --git a/site2/docs/functions-metrics.md b/site2/docs/functions-metrics.md new file mode 100644 index 0000000000000..8c46ed4cf657c --- /dev/null +++ b/site2/docs/functions-metrics.md @@ -0,0 +1,43 @@ +--- +id: functions-metrics +title: Metrics for Pulsar Functions +sidebar_label: Metrics +--- + +Pulsar Functions can publish arbitrary metrics to the metrics interface which can then be queried. This doc contains instructions for publishing metrics using the [Java](#java-sdk) and [Python](#python-sdk) Pulsar Functions SDKs. + +> #### Metrics and stats not available through language-native interfaces +> If a Pulsar Function uses the language-native interface for [Java](functions-api.md#java-native-functions) or [Python](#python-native-functions), that function will not be able to publish metrics and stats to Pulsar. + +## Accessing metrics + +For a guide to accessing metrics created by Pulsar Functions, see the guide to [Monitoring](deploy-monitoring.md) in Pulsar. + +## Java SDK + +If you're creating a Pulsar Function using the [Java SDK](functions-api.md#java-sdk-functions), the {@inject: javadoc:Context:/pulsar-functions/org/apache/pulsar/functions/api/Context} object has a `recordMetric` method that you can use to register both a name for the metric and a value. Here's the signature for that method: + +```java +void recordMetric(String metricName, double value); +``` + +Here's an example function: + +```java +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + +public class MetricRecordingFunction implements Function { + @Override + public void apply(String input, Context context) { + context.recordMetric("number-of-characters", input.length()); + return null; + } +} +``` + +This function counts the length of each incoming message (of type `String`) and then registers that under the `number-of-characters` metric. + +## Python SDK + +Documentation for the [Python SDK](functions-api.md#python-sdk-functions) is coming soon. \ No newline at end of file diff --git a/site2/docs/functions-state.md b/site2/docs/functions-state.md new file mode 100644 index 0000000000000..a203d06e7e481 --- /dev/null +++ b/site2/docs/functions-state.md @@ -0,0 +1,117 @@ +--- +id: functions-state +title: Pulsar Functions State Storage (Developer Preview) +sidebar_label: State Storage +--- + +Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table service](https://docs.google.com/document/d/155xAwWv5IdOitHh1NVMEwCMGgB28M3FyMiQSxEpjE-Y/edit#heading=h.56rbh52koe3f) +for storing the `State` for functions. For example, A `WordCount` function can store its `counters` state into BookKeeper's table service via Pulsar Functions [State API](#api). + +## API + +### Java API + +Currently Pulsar Functions expose following APIs for mutating and accessing State. These APIs are avaible in the [Context](functions-api.md#context) object when +you are using [Java SDK](functions-api.md#java-sdk-functions) functions. + +#### incrCounter + +```java + /** + * Increment the builtin distributed counter refered by key + * @param key The name of the key + * @param amount The amount to be incremented + */ + void incrCounter(String key, long amount); +``` + +Application can use `incrCounter` to change the counter of a given `key` by the given `amount`. + +#### getCounter + +```java + /** + * Retrieve the counter value for the key. + * + * @param key name of the key + * @return the amount of the counter value for this key + */ + long getCounter(String key); +``` + +Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`. + +Besides the `counter` API, Pulsar also exposes a general key/value API for functions to store +general key/value state. + +#### putState + +```java + /** + * Update the state value for the key. + * + * @param key name of the key + * @param value state value of the key + */ + void putState(String key, ByteBuffer value); +``` + +#### getState + +``` + /** + * Retrieve the state value for the key. + * + * @param key name of the key + * @return the state value for the key. + */ + ByteBuffer getState(String key); +``` + +### Python API + +State currently is not supported at [Python SDK](functions-api.md#python-sdk-functions). + +## Query State + +A Pulsar Function can use the [State API](#api) for storing state into Pulsar's state storage +and retrieving state back from Pulsar's state storage. Additionally Pulsar also provides +CLI commands for querying its state. + +```shell +$ bin/pulsar-admin functions querystate \ + --tenant \ + --namespace \ + --name \ + --state-storage-url \ + --key \ + [---watch] +``` + +If `--watch` is specified, the CLI will watch the value of the provided `state-key`. + +## Example + +### Java Example + +{@inject: github:`WordCountFunction`:/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java} is a very good example +demonstrating on how Application can easily store `state` in Pulsar Functions. + +```java +public class WordCountFunction implements Function { + @Override + public Void process(String input, Context context) throws Exception { + Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1)); + return null; + } +} +``` + +The logic of this `WordCount` function is pretty simple and straightforward: + +1. The function first splits the received `String` into multiple words using regex `\\.`. +2. For each `word`, the function increments the corresponding `counter` by 1 (via `incrCounter(key, amount)`). + +### Python Example + +State currently is not supported at [Python SDK](functions-api.md#python-sdk-functions). \ No newline at end of file diff --git a/site2/docs/io-tcp.md b/site2/docs/io-tcp.md new file mode 100644 index 0000000000000..a6f4293e55d71 --- /dev/null +++ b/site2/docs/io-tcp.md @@ -0,0 +1,19 @@ +--- +id: io-tcp +title: Netty Tcp Connector +sidebar_label: Netty Tcp Connector +--- + +## Source + +The Netty Tcp Source connector is used to listen Tcp messages from Tcp Client and write them to user-defined Pulsar topic. +Also, this connector is suggested to be used in a containerized (e.g. k8s) deployment. +Otherwise, if the connector is running in process or thread mode, the instances may be conflicting on listening to ports. + +### Source Configuration Options + +| Name | Required | Default | Description | +|------|----------|---------|-------------| +| `host` | `false` | `127.0.0.1` | The host name or address that the source instance to listen on. | +| `port` | `false` | `10999` | The port that the source instance to listen on. | +| `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp Server to accept incoming connections and handle the traffic of the accepted connections. | \ No newline at end of file diff --git a/site2/docs/security-token-admin.md b/site2/docs/security-token-admin.md new file mode 100644 index 0000000000000..5cf5120944476 --- /dev/null +++ b/site2/docs/security-token-admin.md @@ -0,0 +1,158 @@ +--- +id: security-token-admin +title: Token authentication admin +sidebar_label: Token authentication admin +--- + +## Token Authentication Overview + +Pulsar supports authenticating clients using security tokens that are based on +[JSON Web Tokens](https://jwt.io/introduction/) ([RFC-7519](https://tools.ietf.org/html/rfc7519)). + +Tokens are used to identify a Pulsar client and associate with some "principal" (or "role") which +will be then granted permissions to do some actions (eg: publish or consume from a topic). + +A user will typically be given a token string by an administrator (or some automated service). + +The compact representation of a signed JWT is a string that looks like: + +``` + eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY + ``` + +Application will specify the token when creating the client instance. An alternative is to pass +a "token supplier", that is to say a function that returns the token when the client library +will need one. + +> #### Always use TLS transport encryption +> Sending a token is equivalent to sending a password over the wire. It is strongly recommended to +> always use TLS encryption when talking to the Pulsar service. See +> [Transport Encryption using TLS](security-tls-transport.md) + +## Secret vs Public/Private keys + +JWT support two different kind of keys in order to generate and validate the tokens: + + * Symmetric : + - there is a single ***Secret*** key that is used both to generate and validate + * Asymmetric: there is a pair of keys. + - ***Private*** key is used to generate tokens + - ***Public*** key is used to validate tokens + +### Secret key + +When using a secret key, the administrator will create the key and he will +use it to generate the client tokens. This key will be also configured to +the brokers to allow them to validate the clients. + +#### Creating a secret key + +> Output file will be generated in the root of your pulsar installation directory. You can also provide absolute path for the output file. +```shell +$ bin/pulsar tokens create-secret-key --output my-secret.key +``` +To generate base64 encoded private key +```shell +$ bin/pulsar tokens create-secret-key --output /opt/my-secret.key --base64 +``` + +### Public/Private keys + +With public/private, we need to create a pair of keys. Pulsar supports all algorithms supported by the Java JWT library shown [here](https://github.com/jwtk/jjwt#signature-algorithms-keys) + +#### Creating a key pair + +> Output file will be generated in the root of your pulsar installation directory. You can also provide absolute path for the output file. +```shell +$ bin/pulsar tokens create-key-pair --output-private-key my-private.key --output-public-key my-public.key +``` + + * `my-private.key` will be stored in a safe location and only used by administrator to generate + new tokens. + * `my-public.key` will be distributed to all Pulsar brokers. This file can be publicly shared without + any security concern. + +## Generating tokens + +A token is the credential associated with a user. The association is done through the "principal", +or "role". In case of JWT tokens, this field it's typically referred to as **subject**, though +it's exactly the same concept. + +The generated token is then required to have a **subject** field set. + +```shell +$ bin/pulsar tokens create --secret-key file:///path/to/my-secret.key \ + --subject test-user +``` + +This will print the token string on stdout. + +Similarly, one can create a token by passing the "private" key: + +```shell +$ bin/pulsar tokens create --private-key file:///path/to/my-private.key \ + --subject test-user +``` + +Finally, a token can also be created with a pre-defined TTL. After that time, +the token will be automatically invalidated. + +```shell +$ bin/pulsar tokens create --secret-key file:///path/to/my-secret.key \ + --subject test-user \ + --expiry-time 1y +``` + +## Authorization + +The token itself doesn't have any permission associated. That will be determined by the +authorization engine. Once the token is created, one can grant permission for this token to do certain +actions. Eg. : + +```shell +$ bin/pulsar-admin namespaces grant-permission my-tenant/my-namespace \ + --role test-user \ + --actions produce,consume +``` + +## Enabling Token Authentication ... + +### ... on Brokers + +To configure brokers to authenticate clients, put the following in `broker.conf`: + +```properties +# Configuration to enable authentication and authorization +authenticationEnabled=true +authorizationEnabled=true +authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken + +# If using secret key +tokenSecretKey=file:///path/to/secret.key +# The key can also be passed inline: +# tokenSecretKey=data:base64,FLFyW0oLJ2Fi22KKCm21J18mbAdztfSHN/lAT5ucEKU= + +# If using public/private +# tokenPublicKey=file:///path/to/public.key +``` + +### ... on Proxies + +To configure proxies to authenticate clients, put the following in `proxy.conf`: + +The proxy will have its own token used when talking to brokers. The role token for this +key pair should be configured in the ``proxyRoles`` of the brokers. See the [authorization guide](security-authorization.md) for more details. + +```properties +# For clients connecting to the proxy +authenticationEnabled=true +authorizationEnabled=true +authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken +tokenSecretKey=file:///path/to/secret.key + +# For the proxy to connect to brokers +brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken +brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.9OHgE9ZUDeBTZs7nSMEFIuGNEX18FLR3qvy8mqxSxXw +# Or, alternatively, read token from file +# brokerClientAuthenticationParameters=file:///path/to/proxy-token.txt +``` \ No newline at end of file diff --git a/site2/website/pages/en/download.js b/site2/website/pages/en/download.js index 8b966f2ef22a0..4ad16735aa9d8 100644 --- a/site2/website/pages/en/download.js +++ b/site2/website/pages/en/download.js @@ -228,27 +228,31 @@ class Download extends React.Component { {releaseInfo.map( - info => - info.version !== latestVersion && ( - + info => { + var sha = "sha512" + if (info.version.includes('1.19.0-incubating') || info.version.includes('1.20.0-incubating')) { + sha = "sha" + } + return info.version !== latestVersion && ( + {info.version} - apache-pulsar-{info.version}-bin-tar.gz -   + apache-pulsar-{info.version}-bin-tar.gz   (asc,  - sha512) - - - apache-pulsar-{info.version}-bin-tar.gz -   - (asc  - sha512) - - + {`${sha}`}) + + + apache-pulsar-{info.version}-src-tar.gz +   + (asc,  + {`${sha}`}) + + Release Notes - - - ) + + + ) + } )} diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/getting-started-standalone.md b/site2/website/versioned_docs/version-2.1.0-incubating/getting-started-standalone.md index 940411579c394..bb86dff0dabd8 100644 --- a/site2/website/versioned_docs/version-2.1.0-incubating/getting-started-standalone.md +++ b/site2/website/versioned_docs/version-2.1.0-incubating/getting-started-standalone.md @@ -51,7 +51,7 @@ Directory | Contains `conf` | Configuration files for Pulsar, including for [broker configuration](reference-configuration.md#broker), [ZooKeeper configuration](reference-configuration.md#zookeeper), and more `examples` | A Java JAR file containing example [Pulsar Functions](functions-overview.md) `lib` | The [JAR](https://en.wikipedia.org/wiki/JAR_(file_format)) files used by Pulsar -`licenses` | License files, in `.txt` form, for various components of the Pulsar [codebase](developing-codebase.md) +`licenses` | License files, in `.txt` form, for various components of the Pulsar [codebase](https://github.com/apache/pulsar) These directories will be created once you begin running Pulsar: diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/io-managing.md b/site2/website/versioned_docs/version-2.1.0-incubating/io-managing.md index 7ed1e4e255f56..e7b5ec212fb85 100644 --- a/site2/website/versioned_docs/version-2.1.0-incubating/io-managing.md +++ b/site2/website/versioned_docs/version-2.1.0-incubating/io-managing.md @@ -140,7 +140,7 @@ Here's an example to submit a Cassandra sink: ## Monitoring Connectors -Since Pulsar IO connectors are running as [Pulsar Functions](functions-overiew.md), so you can use [`functions`](reference-pulsar-admin.md#source) commands +Since Pulsar IO connectors are running as [Pulsar Functions](functions-overview.md), so you can use [`functions`](reference-pulsar-admin.md#source) commands available in the [`pulsar-admin`](reference-pulsar-admin.md) CLI tool. ### Retrieve Connector Metadata diff --git a/site2/website/versioned_docs/version-2.1.1-incubating/getting-started-standalone.md b/site2/website/versioned_docs/version-2.1.1-incubating/getting-started-standalone.md index dc05725f3bdc1..c5b3c12ee9815 100644 --- a/site2/website/versioned_docs/version-2.1.1-incubating/getting-started-standalone.md +++ b/site2/website/versioned_docs/version-2.1.1-incubating/getting-started-standalone.md @@ -50,7 +50,7 @@ Directory | Contains `conf` | Configuration files for Pulsar, including for [broker configuration](reference-configuration.md#broker), [ZooKeeper configuration](reference-configuration.md#zookeeper), and more `examples` | A Java JAR file containing example [Pulsar Functions](functions-overview.md) `lib` | The [JAR](https://en.wikipedia.org/wiki/JAR_(file_format)) files used by Pulsar -`licenses` | License files, in `.txt` form, for various components of the Pulsar [codebase](developing-codebase.md) +`licenses` | License files, in `.txt` form, for various components of the Pulsar [codebase](https://github.com/apache/pulsar) These directories will be created once you begin running Pulsar: diff --git a/site2/website/versioned_docs/version-2.1.1-incubating/io-managing.md b/site2/website/versioned_docs/version-2.1.1-incubating/io-managing.md index 7d20afbc06e84..c356485c53abe 100644 --- a/site2/website/versioned_docs/version-2.1.1-incubating/io-managing.md +++ b/site2/website/versioned_docs/version-2.1.1-incubating/io-managing.md @@ -140,7 +140,7 @@ Here's an example to submit a Cassandra sink: ## Monitoring Connectors -Since Pulsar IO connectors are running as [Pulsar Functions](functions-overiew.md), so you can use [`functions`](reference-pulsar-admin.md#source) commands +Since Pulsar IO connectors are running as [Pulsar Functions](functions-overview.md), so you can use [`functions`](reference-pulsar-admin.md#source) commands available in the [`pulsar-admin`](reference-pulsar-admin.md) CLI tool. ### Retrieve Connector Metadata diff --git a/site2/website/versioned_docs/version-2.2.0/getting-started-standalone.md b/site2/website/versioned_docs/version-2.2.0/getting-started-standalone.md index a111cb6c92c8a..0b5ebe6d4ba17 100644 --- a/site2/website/versioned_docs/version-2.2.0/getting-started-standalone.md +++ b/site2/website/versioned_docs/version-2.2.0/getting-started-standalone.md @@ -50,7 +50,7 @@ Directory | Contains `conf` | Configuration files for Pulsar, including for [broker configuration](reference-configuration.md#broker), [ZooKeeper configuration](reference-configuration.md#zookeeper), and more `examples` | A Java JAR file containing example [Pulsar Functions](functions-overview.md) `lib` | The [JAR](https://en.wikipedia.org/wiki/JAR_(file_format)) files used by Pulsar -`licenses` | License files, in `.txt` form, for various components of the Pulsar [codebase](developing-codebase.md) +`licenses` | License files, in `.txt` form, for various components of the Pulsar [codebase](http://github.com/apache/pulsar) These directories will be created once you begin running Pulsar: diff --git a/site2/website/versioned_docs/version-2.2.0/io-managing.md b/site2/website/versioned_docs/version-2.2.0/io-managing.md index 31848dce74891..39697ddd94c57 100644 --- a/site2/website/versioned_docs/version-2.2.0/io-managing.md +++ b/site2/website/versioned_docs/version-2.2.0/io-managing.md @@ -140,7 +140,7 @@ Here's an example to submit a Cassandra sink: ## Monitoring Connectors -Since Pulsar IO connectors are running as [Pulsar Functions](functions-overiew.md), so you can use [`functions`](reference-pulsar-admin.md#source) commands +Since Pulsar IO connectors are running as [Pulsar Functions](functions-overview.md), so you can use [`functions`](reference-pulsar-admin.md#source) commands available in the [`pulsar-admin`](reference-pulsar-admin.md) CLI tool. ### Retrieve Connector Metadata diff --git a/site2/website/versioned_docs/version-2.2.1/getting-started-standalone.md b/site2/website/versioned_docs/version-2.2.1/getting-started-standalone.md index 5845bce9ed01c..15527dd3853bb 100644 --- a/site2/website/versioned_docs/version-2.2.1/getting-started-standalone.md +++ b/site2/website/versioned_docs/version-2.2.1/getting-started-standalone.md @@ -50,7 +50,7 @@ Directory | Contains `conf` | Configuration files for Pulsar, including for [broker configuration](reference-configuration.md#broker), [ZooKeeper configuration](reference-configuration.md#zookeeper), and more `examples` | A Java JAR file containing example [Pulsar Functions](functions-overview.md) `lib` | The [JAR](https://en.wikipedia.org/wiki/JAR_(file_format)) files used by Pulsar -`licenses` | License files, in `.txt` form, for various components of the Pulsar [codebase](developing-codebase.md) +`licenses` | License files, in `.txt` form, for various components of the Pulsar [codebase](http://github.com/apache/pulsar) These directories will be created once you begin running Pulsar: diff --git a/site2/website/versioned_docs/version-2.4.0/io-managing.md b/site2/website/versioned_docs/version-2.4.0/io-managing.md index b09a0cd52057d..8b06593f7c5b3 100644 --- a/site2/website/versioned_docs/version-2.4.0/io-managing.md +++ b/site2/website/versioned_docs/version-2.4.0/io-managing.md @@ -140,7 +140,7 @@ Here's an example to submit a Cassandra sink: ## Monitoring Connectors -Since Pulsar IO connectors are running as [Pulsar Functions](functions-overiew.md), so you can use [`functions`](reference-pulsar-admin.md#source) commands +Since Pulsar IO connectors are running as [Pulsar Functions](functions-overview.md), so you can use [`functions`](reference-pulsar-admin.md#source) commands available in the [`pulsar-admin`](reference-pulsar-admin.md) CLI tool. ### Retrieve Connector Metadata