Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alink1.5.2 getOutputTable().to_pandas() raise ERROR #197

Open
ygean opened this issue Feb 10, 2022 · 3 comments
Open

alink1.5.2 getOutputTable().to_pandas() raise ERROR #197

ygean opened this issue Feb 10, 2022 · 3 comments

Comments

@ygean
Copy link

ygean commented Feb 10, 2022

sample code as below:

input_users = [[617127], [4841758], [382730], [1399180]]
df_users = pd.DataFrame(input_users)
sdata = StreamOperator.fromDataframe(df_users, schemaStr='user bigint')
predictor = ItemCfItemsPerUserRecommStreamOp(model)\
    .setUserCol("user")\
    .setReservedCols(["user"])\
    .setK(3)\
    .setRecommCol("prediction_result");
b = predictor.linkFrom(sdata).getOutputTable()
StreamOperator.execute()
b.to_pandas()

Get ERROR:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Input In [37], in <module>
----> 1 b.to_pandas()

File ~/.local/lib/python3.8/site-packages/pyflink/table/table.py:998, in Table.to_pandas(self)
    995 gateway = get_gateway()
    996 max_arrow_batch_size = self._j_table.getTableEnvironment().getConfig().getConfiguration()\
    997     .getInteger(gateway.jvm.org.apache.flink.python.PythonOptions.MAX_ARROW_BATCH_SIZE)
--> 998 batches_iterator = gateway.jvm.org.apache.flink.table.runtime.arrow.ArrowUtils\
    999     .collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
   1000 if batches_iterator.hasNext():
   1001     import pytz

File ~/.local/lib/python3.8/site-packages/py4j/java_gateway.py:1285, in JavaMember.__call__(self, *args)
   1279 command = proto.CALL_COMMAND_NAME +\
   1280     self.command_header +\
   1281     args_command +\
   1282     proto.END_COMMAND_PART
   1284 answer = self.gateway_client.send_command(command)
-> 1285 return_value = get_return_value(
   1286     answer, self.gateway_client, self.target_id, self.name)
   1288 for temp_arg in temp_args:
   1289     temp_arg._detach()

File ~/.local/lib/python3.8/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: java.lang.UnsupportedOperationException: Python vectorized UDF doesn't support logical type LEGACY('RAW', 'ANY<com.alibaba.alink.common.MTable>') currently.
	at org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:947)
	at org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:821)
	at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:202)
	at org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:107)
	at org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowField(ArrowUtils.java:227)
	at org.apache.flink.table.runtime.arrow.ArrowUtils.lambda$toArrowSchema$0(ArrowUtils.java:218)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
	at org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowSchema(ArrowUtils.java:219)
	at org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:662)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

dependency list:
pandas==1.3.5

@ygean
Copy link
Author

ygean commented Feb 10, 2022

downgrade alink at 1.4.0 and reinstall pandas==1.1.5, it works.

@Fanoid
Copy link
Collaborator

Fanoid commented Feb 11, 2022

Hi.
to_pandas is provided by PyFlink, not PyAlink.
From the error message, it seems to_pandas does not support Alink-defined types:
: java.lang.UnsupportedOperationException: Python vectorized UDF doesn't support logical type LEGACY('RAW', 'ANY<com.alibaba.alink.common.MTable>') currently.

In previous PyAlink version, this operator didn't use the Alink-defined type (MTable), so it worked.

@ygean
Copy link
Author

ygean commented Feb 11, 2022

@Fanoid Thanks.

@ygean ygean closed this as completed Feb 11, 2022
@ygean ygean reopened this Feb 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants