ChipDeviceCtrl.py API

Contents

ChipDeviceCtrl.py API#

chip.ChipDeviceCtrl#

Chip Device Controller interface

RegisterOnActiveCallback#

def RegisterOnActiveCallback(scopedNodeId: ScopedNodeId,
                             callback: typing.Callable[[ScopedNodeId], None])

Registers a callback when the device with given (fabric index, node id) becomes active.

Does nothing if the callback is already registered.

UnregisterOnActiveCallback#

def UnregisterOnActiveCallback(scopedNodeId: ScopedNodeId,
                               callback: typing.Callable[[ScopedNodeId],
                                                         None])

Unregisters a callback when the device with given (fabric index, node id) becomes active.

Does nothing if the callback has not been registered.

WaitForCheckIn#

async def WaitForCheckIn(scopedNodeId: ScopedNodeId, timeoutSeconds: float)

Waits for a device becomes active.

Returns:

  • A future, completes when the device becomes active.

CallbackContext#

class CallbackContext()

A context manager for handling callbacks that are expected to be called exactly once.

The context manager makes sure that no concurrent operations which use the same callback handlers are executed.

CommissioningContext#

class CommissioningContext(CallbackContext)

A context manager for handling commissioning callbacks that are expected to be called exactly once.

This context also resets commissioning related device controller state.

CommissionableNode#

class CommissionableNode(discovery.CommissionableNode)

Commission#

def Commission(nodeId: int, setupPinCode: int) -> int

Commission the device using the device controller discovered this device.

nodeId: The nodeId commissioned to the device setupPinCode: The setup pin code of the device

Returns:

  • int - Effective Node ID of the device (as defined by the assigned NOC)

DeviceProxyWrapper#

class DeviceProxyWrapper()

Encapsulates a pointer to OperationalDeviceProxy on the c++ side that needs to be freed when DeviceProxyWrapper goes out of scope. There is a potential issue where if this is copied around that a double free will occur, but how this is used today that is not an issue that needs to be accounted for and it will become very apparent if that happens.

ChipDeviceControllerBase#

class ChipDeviceControllerBase()

Shutdown#

def Shutdown()

Shuts down this controller and reclaims any used resources, including the bound C++ constructor instance in the SDK.

Raises:

  • ChipStackError - On failure.

Returns:

None

ShutdownAll#

def ShutdownAll()

Shut down all active controllers and reclaim any used resources.

CheckIsActive#

def CheckIsActive()

Checks if the device controller is active.

Raises:

  • RuntimeError - On failure.

IsConnected#

def IsConnected()

Checks if the device controller is connected.

Returns:

  • bool - True if is connected, False if not connected.

Raises:

  • RuntimeError - If ‘_isActive’ is False (from the call to CheckIsActive).

ConnectBLE#

async def ConnectBLE(discriminator: int,
                     setupPinCode: int,
                     nodeid: int,
                     isShortDiscriminator: bool = False) -> int

Connect to a BLE device via PASE using the given discriminator and setup pin code.

Arguments:

  • discriminator int - The long discriminator for the DNS-SD advertisement. Valid range: 0-4095.

  • setupPinCode int - The setup pin code of the device.

  • nodeid int - Node id of the device.

  • isShortDiscriminator Optional[bool] - Optional short discriminator.

Returns:

  • int - Effective Node ID of the device (as defined by the assigned NOC).

UnpairDevice#

async def UnpairDevice(nodeid: int) -> None

Unpairs the device with the specified node ID.

Arguments:

  • nodeid int - Node id of the device.

Returns:

None.

CloseBLEConnection#

def CloseBLEConnection()

Closes the BLE connection for the device controller.

Raises:

  • ChipStackError - On failure.

ExpireSessions#

def ExpireSessions(nodeid)

Close all sessions with nodeid (if any existed) so that sessions get re-established.

This is needed to properly handle operations that invalidate a node’s state, such as UpdateNOC.

WARNING: ONLY CALL THIS IF YOU UNDERSTAND THE SIDE-EFFECTS

Arguments:

  • nodeid int - Node id of the device.

Raises:

  • ChipStackError - On failure.

MarkSessionDefunct#

def MarkSessionDefunct(nodeid)

Marks a previously active session with the specified node as defunct to temporarily prevent it from being used with new exchanges to send or receive messages. This should be called when there is suspicion of a loss-of-sync with the session state on the associated peer, such as evidence of transport failure.

If messages are received thereafter on this session, the session will be put back into the Active state.

This function should only be called on an active session. This will NOT detach any existing SessionHolders.

Arguments:

  • nodeid int - The node ID of the device whose session should be marked as defunct.

Raises:

  • RuntimeError - If the controller is not active.

  • PyChipError - If the operation fails.

MarkSessionForEviction#

def MarkSessionForEviction(nodeid)

Marks the session with the specified node for eviction. It will first detach all SessionHolders attached to this session by calling ‘OnSessionReleased’ on each of them. This will force them to release their reference to the session. If there are no more references left, the session will then be de-allocated.

Once marked for eviction, the session SHALL NOT ever become active again.

Arguments:

  • nodeid int - The node ID of the device whose session should be marked for eviction.

Raises:

  • RuntimeError - If the controller is not active.

  • PyChipError - If the operation fails.

DeleteAllSessionResumptionStorage#

def DeleteAllSessionResumptionStorage()

Remove all session resumption information associated with the fabric index of the controller.

Raises:

  • RuntimeError - If the controller is not active.

  • PyChipError - If the operation fails.

EstablishPASESessionBLE#

async def EstablishPASESessionBLE(setupPinCode: int, discriminator: int,
                                  nodeid: int) -> None

Establish a PASE session over BLE.

Arguments:

  • discriminator int - The long discriminator for the DNS-SD advertisement. Valid range: 0-4095.

  • setupPinCode int - The setup pin code of the device.

  • nodeid int - Node id of the device.

Returns:

None

EstablishPASESessionIP#

async def EstablishPASESessionIP(ipaddr: str,
                                 setupPinCode: int,
                                 nodeid: int,
                                 port: int = 0) -> None

Establish a PASE session over IP.

Arguments:

  • ipaddr str - IP address.

  • port int - IP port to use (default is 0).

  • setupPinCode int - The setup pin code of the device.

  • nodeid int - Node id of the device.

Returns:

None

EstablishPASESession#

async def EstablishPASESession(setUpCode: str, nodeid: int) -> None

Establish a PASE session using setUpCode.

Arguments:

  • setUpCode str - The setup code of the device.

  • nodeid int - Node id of the device.

Returns:

None

GetTestCommissionerUsed#

def GetTestCommissionerUsed()

Get the status of test commissioner in use.

Returns:

  • bool - True if the test commissioner is in use, False if not.

SetTestCommissionerSimulateFailureOnStage#

def SetTestCommissionerSimulateFailureOnStage(stage: int)

Simulates a failure on a specific stage of the test commissioner.

Arguments:

  • stage int - The commissioning to simulate.

Returns:

  • bool - True if the failure simulate success, False if not.

SetTestCommissionerSimulateFailureOnReport#

def SetTestCommissionerSimulateFailureOnReport(stage: int)

Simulates a failure on report of the test commissioner.

Arguments:

  • stage int - The commissioning to simulate.

Returns:

  • bool - True if the failure simulate success, False if not.

SetTestCommissionerPrematureCompleteAfter#

def SetTestCommissionerPrematureCompleteAfter(stage: int)

Premature complete of the test commissioner.

Arguments:

  • stage int - The commissioning to simulate.

Returns:

  • bool - True if the premature complete success, False if not.

CheckTestCommissionerCallbacks#

def CheckTestCommissionerCallbacks()

Check the test commissioner callbacks.

Returns:

  • bool - True if the test commissioner callbacks success, False if not.

CheckStageSuccessful#

def CheckStageSuccessful(stage: int)

Check the test commissioner stage sucess.

Arguments:

  • stage int - The commissioning to simulate.

Returns:

  • bool - True if test commissioner stage success, False if not.

CheckTestCommissionerPaseConnection#

def CheckTestCommissionerPaseConnection(nodeid)

Check the test commissioner Pase connection sucess.

Arguments:

  • nodeid int - Node id of the device.

Returns:

  • bool - True if test commissioner Pase connection success, False if not.

ResolveNode#

def ResolveNode(nodeid)

Resove Node id.

Arguments:

  • nodeid int - Node id of the device.

GetAddressAndPort#

def GetAddressAndPort(nodeid)

Get the address and port.

Arguments:

  • nodeid int - Node id of the device.

Returns:

  • tuple - The address and port if no error occurs or None on failure.

DiscoverCommissionableNodes#

async def DiscoverCommissionableNodes(
    filterType: discovery.FilterType = discovery.FilterType.NONE,
    filter: typing.Any = None,
    stopOnFirst: bool = False,
    timeoutSecond: int = 5
) -> typing.Union[None, CommissionableNode, typing.List[CommissionableNode]]

Discover commissionable nodes via DNS-SD with specified filters. Supported filters are:

discovery.FilterType.NONE discovery.FilterType.SHORT_DISCRIMINATOR discovery.FilterType.LONG_DISCRIMINATOR discovery.FilterType.VENDOR_ID discovery.FilterType.DEVICE_TYPE discovery.FilterType.COMMISSIONING_MODE discovery.FilterType.INSTANCE_NAME discovery.FilterType.COMMISSIONER discovery.FilterType.COMPRESSED_FABRIC_ID

This function will always return a list of CommissionableDevice. When stopOnFirst is set, this function will return when at least one device is discovered or on timeout.

Returns:

  • list - A list of discovered devices.

GetDiscoveredDevices#

async def GetDiscoveredDevices()

Get the discovered devices.

Returns:

  • list - A list of discovered devices.

GetIPForDiscoveredDevice#

def GetIPForDiscoveredDevice(idx, addrStr, length)

Get the IP address for a discovered device.

Arguments:

  • idx int - Index of the discovered device.

  • addrStr str - Address of the device.

  • length int - Length of the address.

Returns:

  • bool - True if IP for discovered device success, False if not.

OpenCommissioningWindow#

async def OpenCommissioningWindow(
        nodeid: int, timeout: int, iteration: int, discriminator: int,
        option: CommissioningWindowPasscode) -> CommissioningParameters

Opens a commissioning window on the device with the given nodeid.

Arguments:

  • nodeid int - Node id of the device.

  • timeout int - Command timeout

  • iteration int - The PAKE iteration count associated with the PAKE Passcode ID and ephemeral PAKE passcode verifier to be used for this commissioning. Valid range: 1000 - 100000 Ignored if option == 0

  • discriminator int - The long discriminator for the DNS-SD advertisement. Valid range: 0-4095 Ignored if option == 0 option (int): 0 = kOriginalSetupCode 1 = kTokenWithRandomPIN

Returns:

CommissioningParameters

GetCompressedFabricId#

def GetCompressedFabricId()

Get compressed fabric Id.

Returns:

  • int - The compressed fabric ID as a 64-bit integer.

Raises:

  • ChipStackError - On failure.

GetFabricIdInternal#

def GetFabricIdInternal() -> int

Get the fabric ID from the object. Only used to validate cached value from property.

Returns:

  • int - The raw fabric ID as a 64-bit integer.

Raises:

  • ChipStackError - On failure.

GetFabricIndexInternal#

def GetFabricIndexInternal() -> int

Get the fabric index from the object. Only used to validate cached value from property.

Returns:

  • int - fabric index in local fabric table associated with this controller.

Raises:

  • ChipStackError - On failure.

GetNodeIdInternal#

def GetNodeIdInternal() -> int

Get the node ID from the object. Only used to validate cached value from property.

Returns:

  • int - The Node ID as a 64 bit integer.

Raises:

  • ChipStackError - On failure.

GetRootPublicKeyBytesInternal#

def GetRootPublicKeyBytesInternal() -> bytes

Get the root public key associated with our fabric.

Returns:

  • bytes - The root public key raw bytes in uncompressed point form.

Raises:

  • ChipStackError - On failure.

GetClusterHandler#

def GetClusterHandler()

Get cluster handler

Returns:

  • ChipClusters - An instance of the ChipClusters class.

FindOrEstablishPASESession#

async def FindOrEstablishPASESession(setupCode: str,
                                     nodeid: int,
                                     timeoutMs: typing.Optional[int] = None
                                     ) -> typing.Optional[DeviceProxyWrapper]

Find or establish a PASE session.

Arguments:

  • setUpCode str - The setup code of the device.

  • nodeid int - Node id of the device.

  • timeoutMs Optional[int] - Optional timeout in milliseconds.

Returns:

DeviceProxyWrapper on success, if not is None.

GetConnectedDeviceSync#

def GetConnectedDeviceSync(
        nodeid,
        allowPASE=True,
        timeoutMs: typing.Optional[int] = None,
        payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD)

Gets an OperationalDeviceProxy or CommissioneeDeviceProxy for the specified Node.

Arguments:

  • nodeid int - Target’s Node ID

  • allowPASE bool - Get a device proxy of a device being commissioned.

  • timeoutMs Optional[int] - Timeout for a timed invoke request. Omit or set to ‘None’ to indicate a non-timed request.

Returns:

DeviceProxyWrapper on success.

WaitForActive#

async def WaitForActive(nodeid,
                        *,
                        timeoutSeconds=30.0,
                        stayActiveDurationMs=30000)

Waits a LIT ICD device to become active. Will send a StayActive command to the device on active to allow human operations.

Arguments:

  • nodeId - Node ID of the LID ICD.

  • stayActiveDurationMs - The duration in the StayActive command, in milliseconds.

Returns:

StayActiveResponse on success

GetConnectedDevice#

async def GetConnectedDevice(
        nodeid,
        allowPASE: bool = True,
        timeoutMs: typing.Optional[int] = None,
        payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD)

Gets an OperationalDeviceProxy or CommissioneeDeviceProxy for the specified Node.

Arguments:

  • nodeId int - Target’s Node ID.

  • allowPASE bool - Get a device proxy of a device being commissioned.

  • timeoutMs Optional[int] - Timeout for a timed invoke request. Omit or set to ‘None’ to indicate a non-timed request.

Returns:

DeviceProxyWrapper on success.

ComputeRoundTripTimeout#

def ComputeRoundTripTimeout(nodeid, upperLayerProcessingTimeoutMs: int = 0)

Returns a computed timeout value based on the round-trip time it takes for the peer at the other end of the session to receive a message, process it and send it back. This is computed based on the session type, the type of transport, sleepy characteristics of the target and a caller-provided value for the time it takes to process a message at the upper layer on the target For group sessions.

This will result in a session being established if one wasn’t already.

Returns:

  • int - The computed timeout value in milliseconds, representing the round-trip time.

GetRemoteSessionParameters#

def GetRemoteSessionParameters(nodeid) -> typing.Optional[SessionParameters]

Returns the SessionParameters of reported by the remote node associated with nodeid. If there is some error in getting SessionParameters None is returned.

This will result in a session being established if one wasn’t already established.

Returns:

  • Optional[SessionParameters] - The session parameters.

TestOnlySendBatchCommands#

async def TestOnlySendBatchCommands(
        nodeid: int,
        commands: typing.List[ClusterCommand.InvokeRequestInfo],
        timedRequestTimeoutMs: typing.Optional[int] = None,
        interactionTimeoutMs: typing.Optional[int] = None,
        busyWaitMs: typing.Optional[int] = None,
        suppressResponse: typing.Optional[bool] = None,
        remoteMaxPathsPerInvoke: typing.Optional[int] = None,
        suppressTimedRequestMessage: bool = False,
        commandRefsOverride: typing.Optional[typing.List[int]] = None)

Please see SendBatchCommands for description. TestOnly overridable arguments: remoteMaxPathsPerInvoke: Overrides the number of batch commands we think can be sent to remote node. suppressTimedRequestMessage: When set to true, we suppress sending Timed Request Message. commandRefsOverride: List of commandRefs to use for each command with the same index in commands.

Returns:

TestOnlyBatchCommandResponse

TestOnlySendCommandTimedRequestFlagWithNoTimedInvoke#

async def TestOnlySendCommandTimedRequestFlagWithNoTimedInvoke(
        nodeid: int,
        endpoint: int,
        payload: ClusterObjects.ClusterCommand,
        responseType=None)

Please see SendCommand for description.

Returns:

Command response. The type of the response is defined by the command.

Raises:

InteractionModelError on error

SendCommand#

async def SendCommand(
        nodeid: int,
        endpoint: int,
        payload: ClusterObjects.ClusterCommand,
        responseType=None,
        timedRequestTimeoutMs: typing.Optional[int] = None,
        interactionTimeoutMs: typing.Optional[int] = None,
        busyWaitMs: typing.Optional[int] = None,
        suppressResponse: typing.Optional[bool] = None,
        payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD)

Send a cluster-object encapsulated command to a node and get returned a future that can be awaited upon to receive the response. If a valid responseType is passed in, that will be used to de-serialize the object. If not, the type will be automatically deduced from the metadata received over the wire.

timedWriteTimeoutMs: Timeout for a timed invoke request. Omit or set to ‘None’ to indicate a non-timed request. interactionTimeoutMs: Overall timeout for the interaction. Omit or set to ‘None’ to have the SDK automatically compute the right timeout value based on transport characteristics as well as the responsiveness of the target.

Returns:

command response. The type of the response is defined by the command.

Raises:

InteractionModelError on error

SendBatchCommands#

async def SendBatchCommands(
        nodeid: int,
        commands: typing.List[ClusterCommand.InvokeRequestInfo],
        timedRequestTimeoutMs: typing.Optional[int] = None,
        interactionTimeoutMs: typing.Optional[int] = None,
        busyWaitMs: typing.Optional[int] = None,
        suppressResponse: typing.Optional[bool] = None,
        payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD)

Send a batch of cluster-object encapsulated commands to a node and get returned a future that can be awaited upon to receive the responses. If a valid responseType is passed in, that will be used to de-serialize the object. If not, the type will be automatically deduced from the metadata received over the wire.

nodeId: Target’s Node ID commands: A list of InvokeRequestInfo containing the commands to invoke. timedWriteTimeoutMs: Timeout for a timed invoke request. Omit or set to ‘None’ to indicate a non-timed request. interactionTimeoutMs: Overall timeout for the interaction. Omit or set to ‘None’ to have the SDK automatically compute the right timeout value based on transport characteristics as well as the responsiveness of the target. busyWaitMs: How long to wait in ms after sending command to device before performing any other operations. suppressResponse: Do not send a response to this action

Returns:

  • List of command responses in the same order as what was given in commands. The type of the response is defined by the command.

  • A value of None indicates success.

  • If only a single command fails, for example with UNSUPPORTED_COMMAND, the corresponding index associated with the command will, contain interaction_model.Status.UnsupportedCommand.

  • If a command is not responded to by server, command will contain interaction_model.Status.NoCommandResponse

Raises:

  • InteractionModelError if error with sending of InvokeRequestMessage fails as a whole.

SendGroupCommand#

def SendGroupCommand(groupid: int,
                     payload: ClusterObjects.ClusterCommand,
                     busyWaitMs: typing.Optional[int] = None)

Send a group cluster-object encapsulated command to a group_id and get returned a future that can be awaited upon to get confirmation command was sent.

Returns:

  • None - responses are not sent to group commands.

Raises:

InteractionModelError on error.

WriteAttribute#

async def WriteAttribute(
        nodeid: int,
        attributes: typing.List[typing.Tuple[
            int, ClusterObjects.ClusterAttributeDescriptor]],
        timedRequestTimeoutMs: typing.Optional[int] = None,
        interactionTimeoutMs: typing.Optional[int] = None,
        busyWaitMs: typing.Optional[int] = None,
        payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD)

Write a list of attributes on a target node.

nodeId: Target’s Node ID timedWriteTimeoutMs: Timeout for a timed write request. Omit or set to ‘None’ to indicate a non-timed request. attributes: A list of tuples of type (endpoint, cluster-object): interactionTimeoutMs: Overall timeout for the interaction. Omit or set to ‘None’ to have the SDK automatically compute the right timeout value based on transport characteristics as well as the responsiveness of the target.

E.g (1, Clusters.UnitTesting.Attributes.XYZAttribute(‘hello’)) – Write ‘hello’ to the XYZ attribute on the test cluster to endpoint 1

Returns:

[AttributeStatus] (list - one for each path).

Raises:

InteractionModelError on error.

WriteGroupAttribute#

def WriteGroupAttribute(groupid: int,
                        attributes: typing.List[typing.Tuple[
                            ClusterObjects.ClusterAttributeDescriptor, int]],
                        busyWaitMs: typing.Optional[int] = None)

Write a list of attributes on a target group.

groupid: Group ID to send write attribute to. attributes: A list of tuples of type (cluster-object, data-version). The data-version can be omitted.

E.g (Clusters.UnitTesting.Attributes.XYZAttribute(‘hello’), 1) – Group Write ‘hello’ with data version 1.

Returns:

list = An empty list

Raises:

InteractionModelError on error.

TestOnlyPrepareToReceiveBdxData#

def TestOnlyPrepareToReceiveBdxData() -> asyncio.Future

Sets up the system to expect a node to initiate a BDX transfer. The transfer will send data here.

Returns:

a future that will yield a BdxTransfer with the init message from the transfer.

Raises:

InteractionModelError on error.

TestOnlyPrepareToSendBdxData#

def TestOnlyPrepareToSendBdxData(data: bytes) -> asyncio.Future

Sets up the system to expect a node to initiate a BDX transfer. The transfer will send data to the node.

Returns:

A future that will yield a BdxTransfer with the init message from the transfer.

Raises:

InteractionModelError on error.

Read#

async def Read(
        nodeid: int,
        attributes: typing.
    Optional[typing.List[typing.Union[
        None,  # Empty tuple, all wildcard
        typing.Tuple[int],  # Endpoint
        # Wildcard endpoint, Cluster id present
        typing.Tuple[typing.Type[ClusterObjects.Cluster]],
        # Wildcard endpoint, Cluster + Attribute present
        typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
        # Wildcard attribute id
        typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
        # Concrete path
        typing.Tuple[int,
                     typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
        # Directly specified attribute path
        ClusterAttribute.AttributePath]]] = None,
        dataVersionFilters: typing.Optional[typing.List[typing.Tuple[
            int, typing.Type[ClusterObjects.Cluster], int]]] = None,
        events: typing.Optional[typing.List[typing.Union[
            None,  # Empty tuple, all wildcard
            typing.Tuple[str, int],  # all wildcard with urgency set
            typing.Tuple[int, int],  # Endpoint,
            # Wildcard endpoint, Cluster id present
            typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
            # Wildcard endpoint, Cluster + Event present
            typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
            # Wildcard event id
            typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
            # Concrete path
            typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent],
                         int]]]] = None,
        eventNumberFilter: typing.Optional[int] = None,
        returnClusterObject: bool = False,
        reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
        fabricFiltered: bool = True,
        keepSubscriptions: bool = False,
        autoResubscribe: bool = True,
        payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD)

Read a list of attributes and/or events from a target node

nodeId: Target’s Node ID attributes: A list of tuples of varying types depending on the type of read being requested: (endpoint, Clusters.ClusterA.AttributeA): Endpoint = specific, Cluster = specific, Attribute = specific (endpoint, Clusters.ClusterA): Endpoint = specific, Cluster = specific, Attribute = * (Clusters.ClusterA.AttributeA): Endpoint = *, Cluster = specific, Attribute = specific endpoint: Endpoint = specific, Cluster = *, Attribute = * Clusters.ClusterA: Endpoint = , Cluster = specific, Attribute = * ‘’ or (): Endpoint = *, Cluster = *, Attribute = *

The cluster and attributes specified above are to be selected from the generated cluster objects.

e.g. ReadAttribute(1, [ 1 ] ) – case 4 above. ReadAttribute(1, [ Clusters.BasicInformation ] ) – case 5 above. ReadAttribute(1, [ (1, Clusters.BasicInformation.Attributes.Location ] ) – case 1 above.

An AttributePath can also be specified directly by [chip.cluster.Attribute.AttributePath(…)]

dataVersionFilters: A list of tuples of (endpoint, cluster, data version).

events: A list of tuples of varying types depending on the type of read being requested: (endpoint, Clusters.ClusterA.EventA, urgent): Endpoint = specific, Cluster = specific, Event = specific, Urgent = True/False (endpoint, Clusters.ClusterA, urgent): Endpoint = specific, Cluster = specific, Event = *, Urgent = True/False (Clusters.ClusterA.EventA, urgent): Endpoint = *, Cluster = specific, Event = specific, Urgent = True/False endpoint: Endpoint = specific, Cluster = *, Event = *, Urgent = True/False Clusters.ClusterA: Endpoint = *, Cluster = specific, Event = , Urgent = True/False ‘’ or (): Endpoint = *, Cluster = *, Event = *, Urgent = True/False

eventNumberFilter: Optional minimum event number filter.

returnClusterObject: This returns the data as consolidated cluster objects, with all attributes for a cluster inside a single cluster-wide cluster object.

reportInterval: A tuple of two int-s for (MinIntervalFloor, MaxIntervalCeiling). Used by establishing subscriptions. When not provided, a read request will be sent. fabricFiltered: If True (default), the read/subscribe is fabric-filtered and will only see things associated with the fabric of the reader/subscriber. Relevant for attributes with fabric-scoped data. keepSubscriptions: Keep existing subscriptions. If set to False, existing subscriptions with this node will get cancelled and a new one gets setup. autoResubscribe: Automatically resubscribe to the subscription if subscription is lost. The automatic re-subscription only applies if the subscription establishes on first try. If the first subscription establishment attempt fails the function returns right away.

Returns:

  • AsyncReadTransaction.ReadResponse. Please see ReadAttribute and ReadEvent for examples of how to access data.

Raises:

  • InteractionModelError (chip.interaction_model) on error

ReadAttribute#

async def ReadAttribute(
        nodeid: int,
        attributes: typing.
    Optional[typing.List[typing.Union[
        None,  # Empty tuple, all wildcard
        typing.Tuple[int],  # Endpoint
        # Wildcard endpoint, Cluster id present
        typing.Tuple[typing.Type[ClusterObjects.Cluster]],
        # Wildcard endpoint, Cluster + Attribute present
        typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
        # Wildcard attribute id
        typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
        # Concrete path
        typing.Tuple[int,
                     typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
        # Directly specified attribute path
        ClusterAttribute.AttributePath]]],
        dataVersionFilters: typing.Optional[typing.List[typing.Tuple[
            int, typing.Type[ClusterObjects.Cluster], int]]] = None,
        returnClusterObject: bool = False,
        reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
        fabricFiltered: bool = True,
        keepSubscriptions: bool = False,
        autoResubscribe: bool = True,
        payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD)

Read a list of attributes from a target node, this is a wrapper of DeviceController.Read()

nodeId: Target’s Node ID attributes: A list of tuples of varying types depending on the type of read being requested: (endpoint, Clusters.ClusterA.AttributeA): Endpoint = specific, Cluster = specific, Attribute = specific (endpoint, Clusters.ClusterA): Endpoint = specific, Cluster = specific, Attribute = * (Clusters.ClusterA.AttributeA): Endpoint = *, Cluster = specific, Attribute = specific endpoint: Endpoint = specific, Cluster = *, Attribute = * Clusters.ClusterA: Endpoint = , Cluster = specific, Attribute = * ‘’ or (): Endpoint = *, Cluster = *, Attribute = *

The cluster and attributes specified above are to be selected from the generated cluster objects.

e.g. ReadAttribute(1, [ 1 ] ) – case 4 above. ReadAttribute(1, [ Clusters.BasicInformation ] ) – case 5 above. ReadAttribute(1, [ (1, Clusters.BasicInformation.Attributes.Location ] ) – case 1 above.

An AttributePath can also be specified directly by [chip.cluster.Attribute.AttributePath(…)]

returnClusterObject: This returns the data as consolidated cluster objects, with all attributes for a cluster inside a single cluster-wide cluster object.

reportInterval: A tuple of two int-s for (MinIntervalFloor, MaxIntervalCeiling). Used by establishing subscriptions. When not provided, a read request will be sent. fabricFiltered: If True (default), the read/subscribe is fabric-filtered and will only see things associated with the fabric of the reader/subscriber. Relevant for attributes with fabric-scoped data. keepSubscriptions: Keep existing subscriptions. If set to False, existing subscriptions with this node will get cancelled and a new one gets setup. autoResubscribe: Automatically resubscribe to the subscription if subscription is lost. The automatic re-subscription only applies if the subscription establishes on first try. If the first subscription establishment attempt fails the function returns right away.

Returns:

  • subscription request: ClusterAttribute.SubscriptionTransaction To get notified on attribute change use SetAttributeUpdateCallback on the returned SubscriptionTransaction. This is used to set a callback function, which is a callable of type Callable[[TypedAttributePath, SubscriptionTransaction], None] Get the attribute value from the change path using GetAttribute on the SubscriptionTransaction You can await changes in the main loop using a trigger mechanism from the callback. ex. queue.SimpleQueue

  • read request: AsyncReadTransaction.ReadResponse.attributes. This is of type AttributeCache.attributeCache (Attribute.py), which is a dict mapping endpoints to a list of Cluster (ClusterObjects.py) classes (dict[int, List[Cluster]]) Access as returned_object[endpoint_id][][] Ex. To access the OnTime attribute from the OnOff cluster on endpoint 1 returned_object[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnTime]

Raises:

  • InteractionModelError (chip.interaction_model) on error

ReadEvent#

async def ReadEvent(
        nodeid: int,
        events: typing.List[typing.Union[
            None,  # Empty tuple, all wildcard
            typing.Tuple[str, int],  # all wildcard with urgency set
            typing.Tuple[int, int],  # Endpoint,
            # Wildcard endpoint, Cluster id present
            typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
            # Wildcard endpoint, Cluster + Event present
            typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
            # Wildcard event id
            typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
            # Concrete path
            typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]]],
        eventNumberFilter: typing.Optional[int] = None,
        fabricFiltered: bool = True,
        reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
        keepSubscriptions: bool = False,
        autoResubscribe: bool = True,
        payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD)

Read a list of events from a target node, this is a wrapper of DeviceController.Read()

nodeId: Target’s Node ID events: A list of tuples of varying types depending on the type of read being requested: (endpoint, Clusters.ClusterA.EventA, urgent): Endpoint = specific, Cluster = specific, Event = specific, Urgent = True/False (endpoint, Clusters.ClusterA, urgent): Endpoint = specific, Cluster = specific, Event = *, Urgent = True/False (Clusters.ClusterA.EventA, urgent): Endpoint = *, Cluster = specific, Event = specific, Urgent = True/False endpoint: Endpoint = specific, Cluster = *, Event = *, Urgent = True/False Clusters.ClusterA: Endpoint = *, Cluster = specific, Event = , Urgent = True/False ‘’ or (): Endpoint = *, Cluster = *, Event = *, Urgent = True/False

The cluster and events specified above are to be selected from the generated cluster objects.

e.g. ReadEvent(1, [ 1 ] ) – case 4 above. ReadEvent(1, [ Clusters.BasicInformation ] ) – case 5 above. ReadEvent(1, [ (1, Clusters.BasicInformation.Events.Location ] ) – case 1 above.

eventNumberFilter: Optional minimum event number filter. reportInterval: A tuple of two int-s for (MinIntervalFloor, MaxIntervalCeiling). Used by establishing subscriptions. When not provided, a read request will be sent. keepSubscriptions: Keep existing subscriptions. If set to False, existing subscriptions with this node will get cancelled and a new one gets setup. autoResubscribe: Automatically resubscribe to the subscription if subscription is lost. The automatic re-subscription only applies if the subscription establishes on first try. If the first subscription establishment attempt fails the function returns right away.

Returns:

  • subscription request: ClusterAttribute.SubscriptionTransaction To get notified on event subscriptions, use the SetEventUpdateCallback function on the returned SubscriptionTransaction. This is a callable of type Callable[[EventReadResult, SubscriptionTransaction], None] You can await events using a trigger mechanism in the callback. ex. queue.SimpleQueue

  • read request: AsyncReadTransaction.ReadResponse.events. This is a List[ClusterEvent].

Raises:

  • InteractionModelError (chip.interaction_model) on error

SetIpk#

def SetIpk(ipk: bytes)

Sets the Identity Protection Key (IPK) for the device controller.

Raises:

  • ChipStackError - On failure.

InitGroupTestingData#

def InitGroupTestingData()

Populates the Device Controller’s GroupDataProvider with known test group info and keys.

Raises:

  • ChipStackError - On failure.

CreateManualCode#

def CreateManualCode(discriminator: int, passcode: int) -> str

Creates a standard flow manual code from the given discriminator and passcode.

Arguments:

  • discriminator int - The long discriminator for the DNS-SD advertisement. Valid range: 0-4095.

  • passcode int - The setup passcode of the device.

Returns:

  • str - The decoded string from the buffer.

Raises:

  • MemoryError - If the output size is invalid during manual code creation.

ChipDeviceController#

class ChipDeviceController(ChipDeviceControllerBase)

The ChipDeviceCommissioner binding, named as ChipDeviceController

TODO: This class contains DEPRECATED functions, we should update the test scripts to avoid the usage of those functions.

Commission#

async def Commission(nodeid) -> int

Start the auto-commissioning process on a node after establishing a PASE connection. This function is intended to be used in conjunction with EstablishPASESessionBLE or EstablishPASESessionIP. It can be called either before or after the DevicePairingDelegate receives the OnPairingComplete call. Commissioners that want to perform simple auto-commissioning should use the supplied “CommissionWithCode” function, which will establish the PASE connection and commission automatically.

Arguments:

  • nodeid int - Node id of the device.

Raises:

A ChipStackError on failure.

Returns:

  • int - Effective Node ID of the device (as defined by the assigned NOC).

CommissionThread#

async def CommissionThread(discriminator,
                           setupPinCode,
                           nodeId,
                           threadOperationalDataset: bytes,
                           isShortDiscriminator: bool = False) -> int

Commissions a Thread device over BLE.

Arguments:

  • discriminator int - The long discriminator for the DNS-SD advertisement. Valid range: 0-4095.

  • setupPinCode int - The setup pin code of the device.

  • nodeId int - Node id of the device.

  • threadOperationalDataset bytes - The Thread operational dataset for commissioning.

  • isShortDiscriminator bool - Short discriminator.

Returns:

  • int - Effective Node ID of the device (as defined by the assigned NOC).

CommissionWiFi#

async def CommissionWiFi(discriminator,
                         setupPinCode,
                         nodeId,
                         ssid: str,
                         credentials: str,
                         isShortDiscriminator: bool = False) -> int

Commissions a Wi-Fi device over BLE.

Arguments:

  • discriminator int - The long discriminator for the DNS-SD advertisement. Valid range: 0-4095.

  • setupPinCode int - The setup pin code of the device.

  • nodeId int - Node id of the device.

  • ssid str - SSID of the WiFi network.

  • credentials str - WiFi network password.

  • isShortDiscriminator bool - Short discriminator.

Returns:

  • int - Effective Node ID of the device (as defined by the assigned NOC).

SetWiFiCredentials#

def SetWiFiCredentials(ssid: str, credentials: str)

Set the Wi-Fi credentials to set during commissioning.

Arguments:

  • ssid str - SSID of the WiFi network.

  • credentials str - WiFi network password.

Raises:

  • ChipStackError - On failure.

SetThreadOperationalDataset#

def SetThreadOperationalDataset(threadOperationalDataset)

Set the Thread operational dataset to set during commissioning.

Arguments:

  • threadOperationalDataset bytes - The Thread operational dataset for commissioning.

Raises:

  • ChipStackError - On failure.

ResetCommissioningParameters#

def ResetCommissioningParameters()

Sets the commissioning parameters back to the default values.

Raises:

  • ChipStackError - On failure.

SetTimeZone#

def SetTimeZone(offset: int, validAt: int, name: str = "")

Set the time zone to set during commissioning. Currently only one time zone entry is supported.

Arguments:

  • offset int - Timezone offset.

  • validAt int - Timestamp of the timezone.

  • name str - Name or label of the timezone.

Raises:

  • ChipStackError - On failure.

SetDSTOffset#

def SetDSTOffset(offset: int, validStarting: int, validUntil: int)

Set the DST offset to set during commissioning. Currently only one DST entry is supported.

Arguments:

  • offset int - Timezone offset.

  • validStarting int - The start timestamp.

  • validUntil int - The end timestamp

Raises:

  • ChipStackError - On failure.

SetTCAcknowledgements#

def SetTCAcknowledgements(tcAcceptedVersion: int, tcUserResponse: int)

Set the TC acknowledgements to set during commissioning.

Arguments:

  • tcAcceptedVersion int - TC accepted version.

  • tcUserResponse int - TC user responde.

Raises:

  • ChipStackError - On failure.

SetSkipCommissioningComplete#

def SetSkipCommissioningComplete(skipCommissioningComplete: bool)

Set whether to skip the commissioning complete callback.

Arguments:

  • skipCommissioningComplete bool - The value skip the commissioning complete.

Raises:

  • ChipStackError - On failure.

SetDefaultNTP#

def SetDefaultNTP(defaultNTP: str)

Set the DefaultNTP to set during commissioning.

Arguments:

  • defaultNTP str - The default NTP.

Raises:

  • ChipStackError - On failure.

SetTrustedTimeSource#

def SetTrustedTimeSource(nodeId: int, endpoint: int)

Set the trusted time source nodeId to set during commissioning. This must be a node on the commissioner fabric.

Arguments:

  • nodeId int - Node id of the device.

  • endpoint int - endpoint of the device.

Raises:

  • ChipStackError - On failure.

SetCheckMatchingFabric#

def SetCheckMatchingFabric(check: bool)

Instructs the auto-commissioner to perform a matching fabric check before commissioning.

Arguments:

  • check bool - Validation fabric before commissioning.

Raises:

  • ChipStackError - On failure.

GenerateICDRegistrationParameters#

def GenerateICDRegistrationParameters()

Generates ICD registration parameters for this controller.

Returns:

  • ICDRegistrationParameters - An object containing the generated parameters including symmetricKey, checkInNodeId, monitoredSubject, stayActiveMs, and clientType.

EnableICDRegistration#

def EnableICDRegistration(parameters: ICDRegistrationParameters)

Enables ICD registration for the following commissioning session.

Arguments:

  • parameters - A ICDRegistrationParameters for the parameters used for ICD registration, or None for default arguments.

Raises:

  • ChipStackError - On failure.

DisableICDRegistration#

def DisableICDRegistration()

Disables ICD registration.

Raises:

  • ChipStackError - On failure.

GetFabricCheckResult#

def GetFabricCheckResult() -> int

Returns the fabric check result if SetCheckMatchingFabric was used.

Returns:

  • int - The fabric check result, or -1 if no check was performed.

CommissionOnNetwork#

async def CommissionOnNetwork(
        nodeId: int,
        setupPinCode: int,
        filterType: DiscoveryFilterType = DiscoveryFilterType.NONE,
        filter: typing.Any = None,
        discoveryTimeoutMsec: int = 30000) -> int

Does the routine for OnNetworkCommissioning, with a filter for mDNS discovery. Supported filters are:

DiscoveryFilterType.NONE DiscoveryFilterType.SHORT_DISCRIMINATOR DiscoveryFilterType.LONG_DISCRIMINATOR DiscoveryFilterType.VENDOR_ID DiscoveryFilterType.DEVICE_TYPE DiscoveryFilterType.COMMISSIONING_MODE DiscoveryFilterType.INSTANCE_NAME DiscoveryFilterType.COMMISSIONER DiscoveryFilterType.COMPRESSED_FABRIC_ID

The filter can be an integer, a string or None depending on the actual type of selected filter.

Raises:

  • ChipStackError - On failure.

Returns:

  • Effective Node ID of the device (as defined by the assigned NOC)

get_rcac#

def get_rcac()

Passes captured RCAC data back to Python test modules for validation

  • Setting buffer size to max size mentioned in spec:

  • Ref: https://github.com/CHIP-Specifications/connectedhomeip-spec/blob/06c4d55962954546ecf093c221fe1dab57645028/policies/matter_certificate_policy.adoc#615-key-sizes

Returns:

  • bytes - A bytes sentence representing the RCAC, or None if no data.

CommissionWithCode#

async def CommissionWithCode(
        setupPayload: str,
        nodeid: int,
        discoveryType: DiscoveryType = DiscoveryType.DISCOVERY_ALL) -> int

Commission with the given nodeid from the setupPayload. setupPayload may be a QR or manual code.

Arguments:

  • setupPayload str - The setup payload (QR or manual code).

  • nodeid int - Node id of the device.

  • discoveryType DiscoveryType.DISCOVERY_ALL - The discovery type to use.

Raises:

  • ChipStackError - On failure.

Returns:

  • int - Effective Node ID of the device (as defined by the assigned NOC)

CommissionIP#

async def CommissionIP(ipaddr: str, setupPinCode: int, nodeid: int) -> int

DEPRECATED, DO NOT USE! Use CommissionOnNetwork or CommissionWithCode

Raises:

  • ChipStackError - On failure.

Returns:

Effective Node ID of the device (as defined by the assigned NOC)

NOCChainCallback#

def NOCChainCallback(nocChain)

Callback function for handling the NOC chain result.

Arguments:

  • nocChain nocChain - The object NOC chain data received.

Returns:

None

IssueNOCChain#

async def IssueNOCChain(
        csr: Clusters.OperationalCredentials.Commands.CSRResponse,
        nodeId: int)

Issue an NOC chain using the associated OperationalCredentialsDelegate. The NOC chain will be provided in TLV cert format.

Arguments:

  • crs cluster - Certificate Signing Request response

  • nodeId int - Node id of the device.

Returns:

  • asyncio.Future - A future object that is the result of the NOC Chain operation.

SetDACRevocationSetPath#

def SetDACRevocationSetPath(dacRevocationSetPath: typing.Optional[str])

Set the path to the device attestation revocation set JSON file.

Arguments:

  • dacRevocationSetPath Optional[str] - Path to the JSON file containing the device attestation revocation set.

Raises:

  • ChipStackError - On failure.

BareChipDeviceController#

class BareChipDeviceController(ChipDeviceControllerBase)

A bare device controller without AutoCommissioner support.

__init__#

def __init__(operationalKey: p256keypair.P256Keypair,
             noc: bytes,
             icac: typing.Union[bytes, None],
             rcac: bytes,
             ipk: typing.Union[bytes, None],
             adminVendorId: int,
             name: typing.Optional[str] = None)

Creates a controller without AutoCommissioner.

The allocated controller uses the noc, icac, rcac and ipk instead of the default, random generated certificates / keys. Which is suitable for creating a controller for manually signing certificates for testing.

Arguments:

  • operationalKey - A P256Keypair object for the operational key of the controller.

  • noc bytes - The NOC for the controller, in bytes.

  • icac Optional[bytes] - The optional ICAC for the controller.

  • rcac bytes - The RCAC for the controller.

  • ipk Optional[bytes] - The optional IPK for the controller, when None is provided, the defaultIpk will be used.

  • adminVendorId int - The adminVendorId of the controller.

  • name str - The name of the controller, for debugging use only.

Raises:

  • ChipStackError - On failure