Skip to content

Commit

Permalink
Add watchAllResource subscriber (#1020)
Browse files Browse the repository at this point in the history
  • Loading branch information
brycezhongqing authored Sep 17, 2024
1 parent 5b3e6cc commit 4f561f8
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 20 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.58.7] - 2024-09-13
- Add WildcardResourceSubscriber which could subscribe to all resources, like NODE and URIMap resources.

## [29.58.6] - 2024-09-08
- Allow for null paging inside Collection response envelopes

Expand Down Expand Up @@ -5728,7 +5731,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.6...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.7...master
[29.58.7]: https://github.com/linkedin/rest.li/compare/v29.58.6...v29.58.7
[29.58.6]: https://github.com/linkedin/rest.li/compare/v29.58.5...v29.58.6
[29.58.5]: https://github.com/linkedin/rest.li/compare/v29.58.4...v29.58.5
[29.58.4]: https://github.com/linkedin/rest.li/compare/v29.58.3...v29.58.4
Expand Down
116 changes: 110 additions & 6 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,91 @@ final void onChanged(ResourceUpdate update)
}
}

public static abstract class WildcardResourceWatcher
{
private final ResourceType _type;

/**
* Defining a private constructor means only classes that are defined in this file can extend this class (see
* {@link ResourceWatcher}).
*/
WildcardResourceWatcher(ResourceType type)
{
_type = type;
}

final ResourceType getType()
{
return _type;
}

/**
* Called when the resource discovery RPC encounters some transient error.
*/
public abstract void onError(Status error);

/**
* Called when the resource discovery RPC reestablishes connection.
*/
public abstract void onReconnect();

/**
* Called when a resource is added or updated.
* @param resourceName the name of the resource that was added or updated.
* @param update the new data {@link ResourceUpdate} for the resource.
*/
abstract void onChanged(String resourceName, ResourceUpdate update);

/**
* Called when a resource is removed.
* @param resourceName the name of the resource that was removed.
*/
public abstract void onRemoval(String resourceName);
}

public static abstract class WildcardNodeResourceWatcher extends WildcardResourceWatcher
{
public WildcardNodeResourceWatcher()
{
super(ResourceType.NODE);
}

/**
* Called when a node resource is added or updated.
* @param resourceName the resource name of the {@link NodeUpdate} that was added or updated.
* @param update the new data for the {@link NodeUpdate}, including D2 cluster and service information.
*/
public abstract void onChanged(String resourceName, NodeUpdate update);

@Override
final void onChanged(String resourceName, ResourceUpdate update)
{
onChanged(resourceName, (NodeUpdate) update);
}
}

public static abstract class WildcardD2URIMapResourceWatcher extends WildcardResourceWatcher
{
public WildcardD2URIMapResourceWatcher()
{
super(ResourceType.D2_URI_MAP);
}

/**
* Called when a {@link D2URIMapUpdate} resource is added or updated.
* @param resourceName the resource name of the {@link D2URIMapUpdate} map resource that was added or updated.
* like the /d2/uris/clusterName
* @param update the new data for the {@link D2URIMapUpdate} resource
*/
public abstract void onChanged(String resourceName, D2URIMapUpdate update);

@Override
final void onChanged(String resourceName, ResourceUpdate update)
{
onChanged(resourceName, (D2URIMapUpdate) update);
}
}

public interface ResourceUpdate
{
boolean isValid();
Expand All @@ -109,7 +194,7 @@ public static final class NodeUpdate implements ResourceUpdate
_nodeData = nodeData;
}

XdsD2.Node getNodeData()
public XdsD2.Node getNodeData()
{
return _nodeData;
}
Expand Down Expand Up @@ -261,13 +346,32 @@ static ResourceType fromTypeUrl(String typeUrl)
* will always notify the given watcher of the current data if it is already present, even if the given watcher was
* already subscribed to said resource. However, the subscription will only be added once.
*/
abstract void watchXdsResource(String resourceName, ResourceWatcher watcher);
public abstract void watchXdsResource(String resourceName, ResourceWatcher watcher);

abstract void startRpcStream();
/**
* Subscribes the given {@link WildcardResourceWatcher} to all the resources of the corresponding type. The watcher
* will be notified whenever a resource is added or removed. Repeated calls to this function with the same watcher
* will always notify the given watcher of the current data.
*/
public abstract void watchAllXdsResources(WildcardResourceWatcher watcher);

abstract void shutdown();
/**
* Initiates the RPC stream to the xDS server.
*/
public abstract void startRpcStream();

/**
* Shuts down the xDS client.
*/
public abstract void shutdown();

abstract String getXdsServerAuthority();
/**
* Returns the authority of the xDS server.
*/
public abstract String getXdsServerAuthority();

abstract public XdsClientJmx getXdsClientJmx();
/**
* Returns the JMX bean for the xDS client.
*/
public abstract XdsClientJmx getXdsClientJmx();
}
Loading

0 comments on commit 4f561f8

Please sign in to comment.