Dark Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[Bug] grpc SubStreamHandler exception #4804

Closed
Closed
[Bug] grpc SubStreamHandler exception#4804
Labels
bugSomething isn't working

Description

Search before asking

  • I had searched in the issues and found no similar issues.

Environment

Linux

EventMesh version

1.10.0

What happened

There is a problem with the class of SubStreamHandler. grpc is two-way communication. I have integrated grpc pub and sub functions in the project.
This code throws an exception, and every 30 seconds,

private void senderOnNext(final CloudEvent subscription) {
try {
synchronized (sender) {
sender.onNext(subscription);
}
} catch (Exception e) {
log.error("StreamObserver Error onNext", e);
}
}

I analyzed the reason, because when the exception is thrown, you should call onCompleted to close the sender, but that is not done here, resulting in
grpc thinks the connection is still there and keeps throwing exceptions in a loop.

I tried the method below,it solved the problem.
Will you change code quickly? I am using the project,but this problem stop me.

@slf4j
public class SubStreamHandler extends Thread implements Serializable {

...
public SubStreamHandler(final ConsumerServiceStub consumerAsyncClient, final EventMeshGrpcClientConfig clientConfig,
final ReceiveMsgHook listener) {
this.consumerAsyncClient = consumerAsyncClient;
this.clientConfig = clientConfig;
this.listener = listener;
}

public void sendSubscription(final CloudEvent subscription) {
synchronized (this) {
if (this.sender == null) {
this.sender = consumerAsyncClient.subscribeStream(createReceiver());
}
}
senderOnNext(subscription);
}

private StreamObserver createReceiver() {
return new StreamObserver() {

@Override
public void onNext(final CloudEvent message) {
T msg = EventMeshCloudEventBuilder.buildMessageFromEventMeshCloudEve nt(message, listener.getProtocolType());
if (msg instanceof Set) {
log.info("Received message from Server:{}", message);
} else {
log.info("Received message from Server.|seq={}|uniqueId={}|",
EventMeshCloudEventUtils.getSeqNum(message), EventMeshCloudEventUtils.getUniqueId(message));
CloudEvent streamReply = null;
try {
Optional reply = listener.handle(msg);
if (reply.isPresent()) {
streamReply = buildReplyMessage(message, reply.get());
}
} catch (Exception e) {
log.error("Error in handling reply message.|seq={}|uniqueId={}|",
EventMeshCloudEventUtils.getSeqNum(message), EventMeshCloudEventUtils.getUniqueId(message), e);
}
if (streamReply != null) {
log.info("Sending reply message to Server.|seq={}|uniqueId={}|",
EventMeshCloudEventUtils.getSeqNum(streamReply), EventMeshCloudEventUtils.getUniqueId(streamReply));
senderOnNext(streamReply);
}
}
}

@Override
public void onError(final Throwable t) {
log.error("Received Server side error", t);
**close();**
}

@Override
public void onCompleted() {
log.info("Finished receiving messages from server.");
close();
}
};
}

private CloudEvent buildReplyMessage(final CloudEvent reqMessage, final T replyMessage) {
final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventMeshCloudEvent(replyMessage,
clientConfig, listener.getProtocolType());

return CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessage.getAttributesMap()).putAllAttributes(cloudEvent.getAttributesMap())
.putAttributes(ProtocolKey.DATA_CONTENT_TYPE,
CloudEventAttributeValue.newBuilder().setCeString(EventMeshDataContentType.JSON.getCode()).build())
// Indicate that it is a subscription response
.putAttributes(ProtocolKey.SUB_MESSAGE_TYPE, CloudEventAttributeValue.newBuilder().setCeString(ProtocolKey.SUB_REPLY_MESSAGE).build())
.build();
}

@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
log.error("SubStreamHandler Thread interrupted", e);
Thread.currentThread().interrupt();
}
}

public void close() {
if (this.sender != null) {
senderOnComplete();
}

latch.countDown();

log.info("SubStreamHandler closed.");
}

private void senderOnNext(final CloudEvent subscription) {
try {
synchronized (sender) {
sender.onNext(subscription);
}
} catch (Exception e) {
log.error("StreamObserver Error onNext", e);
**close();**
}
}

private void senderOnComplete() {
try {
synchronized (sender) {
sender.onCompleted();
**sender=null;**
}
} catch (Exception e) {
log.error("StreamObserver Error onComplete", e);
}
}

}

How to reproduce

when exception happen ,this method will cause endless loop.
for example,grpc time out.
private void senderOnNext(final CloudEvent subscription) {
try {
synchronized (sender) {
sender.onNext(subscription);
}
} catch (Exception e) {
log.error("StreamObserver Error onNext", e);
}
}

Debug logs

no

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions