You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/content.zh/docs/dev/table/functions/udfs.md
+29-29Lines changed: 29 additions & 29 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -38,7 +38,7 @@ under the License.
38
38
当前 Flink 有如下几种函数:
39
39
40
40
-*标量函数* 将标量值转换成一个新标量值;
41
-
-*Asynchronous scalar functions* asynchronously map scalar values to a new scalar value.
41
+
-*异步标量函数* 异步地将标量值映射为新的标量值。
42
42
-*表值函数* 将标量值转换成新的行数据;
43
43
-*Async Table functions* asynchronously map scalar values to new rows and can be used for table sources that perform a lookup.
44
44
-*聚合函数* 将多行数据里的标量值转换成一个新标量值;
@@ -976,26 +976,26 @@ env.sqlQuery("SELECT HashFunction(myField) FROM MyTable")
976
976
977
977
{{< top >}}
978
978
979
-
Asynchronous Scalar Functions
979
+
异步标量函数
980
980
----------------
981
981
982
-
When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care that network or other latency does not dominate the streaming application’s running time.
Naively accessing data in the external database, for example using a `ScalarFunction`, typically means **synchronous** interaction: A request is sent to the database and the `ScalarFunction`waits until the response has been received. In many cases, this waiting makes up the vast majority of the function’s time.
To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous interaction with the database means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cases to much higher streaming throughput.
A user-defined asynchronous scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method.
In order to define an asynchronous scalar function, extend the base class `AsyncScalarFunction` in `org.apache.flink.table.functions`and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function.
The number of outstanding calls to `eval`may be configured by [`table.exec.async-scalar.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
The following example shows how to do work on a thread pool in the background, though any libraries exposing an async interface may be directly used to complete the `CompletableFuture` from a callback. See the [Implementation Guide](#implementation-guide) for more details.
env.sqlQuery("SELECT GetBeverageName(beverageId) FROM Beverages");
1068
1068
1069
1069
```
1070
1070
1071
-
#### Asynchronous Semantics
1072
-
While calls to an `AsyncScalarFunction`may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for their use-case.
The primary way for a user to indicate an error is to call `CompletableFuture.completeExceptionally(Throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including all retry attempts), the job will fail.
One thing to consider is if the UDF contains CPU intensive logic with no blocking calls. If so, it likely doesn't require asynchronous functionality and could use a `ScalarFunction`. If the logic involves waiting for things like network or background operations (e.g. database lookups, RPCs, or REST calls), this may be a useful way to speed things up. There are also some queries that don't support `AsyncScalarFunction`, so when in doubt, `ScalarFunction` should be used.
0 commit comments