diff --git a/examples/thread_example.py b/examples/thread_example.py deleted file mode 100644 index a7cb4fa..0000000 --- a/examples/thread_example.py +++ /dev/null @@ -1,149 +0,0 @@ -#!/usr/bin/env python3 -"""Example demonstrating the new thread-aware message retrieval functionality. - -This example shows how to use the new thread utilities to collect thread messages -in both 1:1 conversations and spaces, addressing the issues described in #256. - -Copyright (c) 2016-2024 Cisco and/or its affiliates. - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -""" - -import os -import sys - -# Add the src directory to the path so we can import webexpythonsdk -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) - -import webexpythonsdk -from webexpythonsdk.thread_utils import collect_thread_text_and_attachments - - -def main(): - """Main example function.""" - # Initialize the Webex API - # You'll need to set your access token as an environment variable - access_token = os.getenv("WEBEX_ACCESS_TOKEN") - if not access_token: - print("Please set WEBEX_ACCESS_TOKEN environment variable") - return - - api = webexpythonsdk.WebexAPI(access_token=access_token) - - print("Webex Thread-Aware Message Retrieval Example") - print("=" * 50) - - # Example 1: Using the new thread-aware API methods directly - print("\n1. Using thread-aware API methods:") - print("-" * 30) - - # This would be a message object from a webhook or API call - # For demonstration, we'll create a mock message - class MockMessage: - def __init__(self, message_id, parent_id, room_id, room_type, text): - self.id = message_id - self.parentId = parent_id - self.roomId = room_id - self.roomType = room_type - self.text = text - self.personId = "person123" - self.created = "2024-01-01T10:00:00Z" - - # Example message from a space (group room) - space_message = MockMessage( - message_id="msg123", - parent_id="parent456", - room_id="room789", - room_type="group", - text="This is a reply in a space thread", - ) - - try: - # Get thread context using the new API method - thread_context = api.messages.get_thread_context(space_message) - - print(f"Room Type: {thread_context['room_type']}") - print(f"Is Thread: {thread_context['is_thread']}") - print(f"Reply Count: {thread_context['reply_count']}") - print(f"Thread Messages: {len(thread_context['thread_messages'])}") - - if thread_context["error"]: - print(f"Error: {thread_context['error']}") - else: - print("Thread retrieved successfully!") - - except Exception as e: - print(f"Error retrieving thread context: {e}") - - # Example 2: Using the utility function (drop-in replacement) - print("\n2. Using the utility function:") - print("-" * 30) - - try: - # This is the drop-in replacement for the user's original function - thread_text, attachments = collect_thread_text_and_attachments( - api, space_message - ) - - print(f"Thread Text Length: {len(thread_text)} characters") - print(f"Attachments: {len(attachments)}") - print(f"Thread Text Preview: {thread_text[:100]}...") - - except Exception as e: - print(f"Error using utility function: {e}") - - # Example 3: Handling different room types - print("\n3. Handling different room types:") - print("-" * 30) - - # Direct room message - direct_message = MockMessage( - message_id="msg456", - parent_id="parent789", - room_id="room123", - room_type="direct", - text="This is a reply in a 1:1 conversation", - ) - - try: - # Check room type - is_direct = api.messages._is_direct_room(direct_message) - is_group = api.messages._is_group_room(direct_message) - - print(f"Message is from direct room: {is_direct}") - print(f"Message is from group room: {is_group}") - - except Exception as e: - print(f"Error checking room type: {e}") - - print("\nExample completed!") - print("\nTo use this in your bot:") - print( - "1. Replace your existing collect_thread_text_and_attachments function" - ) - print( - "2. Import: from webexpythonsdk.thread_utils import collect_thread_text_and_attachments" - ) - print( - "3. Call: thread_text, attachments = collect_thread_text_and_attachments(api, msg)" - ) - - -if __name__ == "__main__": - main() diff --git a/src/webexpythonsdk/__init__.py b/src/webexpythonsdk/__init__.py index d6f1908..43ae48a 100644 --- a/src/webexpythonsdk/__init__.py +++ b/src/webexpythonsdk/__init__.py @@ -74,7 +74,6 @@ WebhookEvent, ) from .models.simple import simple_data_factory, SimpleDataModel -from .thread_utils import collect_thread_text_and_attachments from .utils import WebexDateTime diff --git a/src/webexpythonsdk/api/messages.py b/src/webexpythonsdk/api/messages.py index 403a1f7..60d8845 100644 --- a/src/webexpythonsdk/api/messages.py +++ b/src/webexpythonsdk/api/messages.py @@ -394,183 +394,3 @@ def update(self, messageId=None, roomId=None, text=None, markdown=None): # Add edit() as an alias to the update() method for backward compatibility edit = update - - def _is_direct_room(self, message): - """Determine if a message is from a direct (1:1) room. - - Args: - message: Message object with roomType property - - Returns: - bool: True if the message is from a direct room, False otherwise - """ - if hasattr(message, "roomType"): - return message.roomType == "direct" - return False - - def _is_group_room(self, message): - """Determine if a message is from a group room (space). - - Args: - message: Message object with roomType property - - Returns: - bool: True if the message is from a group room, False otherwise - """ - if hasattr(message, "roomType"): - return message.roomType == "group" - return False - - def get_thread_messages(self, message, max_scan=500): - """Retrieve all messages in a thread, including the root message. - - This method provides a robust way to collect thread messages that works - for both 1:1 conversations and spaces, handling the different permission - models and API limitations. - - Args: - message: The message object to get the thread for - max_scan (int): Maximum number of messages to scan when searching for parent - - Returns: - tuple: (thread_messages, root_message, error_message) - - thread_messages: List of all messages in the thread (oldest to newest) - - root_message: The root message of the thread (or None if not found) - - error_message: Error description if any issues occurred - """ - thread_messages = [] - root_message = None - error_message = None - - parent_id = getattr(message, "parentId", None) - room_id = getattr(message, "roomId", None) - - if not parent_id or not room_id: - # Not a threaded message, return just this message - return [message], None, None - - try: - # Strategy 1: Try to get the parent message directly - try: - root_message = self.get(parent_id) - thread_messages.append(root_message) - except Exception: - # Direct retrieval failed, try alternative strategies - if self._is_direct_room(message): - # For direct rooms, try list_direct with parentId - try: - direct_messages = list( - self.list_direct( - personId=getattr(message, "toPersonId", None), - personEmail=getattr( - message, "toPersonEmail", None - ), - parentId=parent_id, - max=100, - ) - ) - if direct_messages: - root_message = direct_messages[0] - thread_messages.extend(direct_messages) - except Exception: - pass - else: - # For group rooms, try scanning recent messages - try: - scanned = 0 - for msg in self.list(roomId=room_id, max=100): - scanned += 1 - if getattr(msg, "id", None) == parent_id: - root_message = msg - thread_messages.append(msg) - break - if scanned >= max_scan: - break - except Exception: - pass - - if not root_message: - error_message = f"Could not retrieve parent message {parent_id}. Bot may have joined after thread started or lacks permission." - - # Strategy 2: Get all replies in the thread - try: - if self._is_direct_room(message): - # For direct rooms, use list_direct - replies = list( - self.list_direct( - personId=getattr(message, "toPersonId", None), - personEmail=getattr( - message, "toPersonEmail", None - ), - parentId=parent_id, - max=100, - ) - ) - else: - # For group rooms, use list - replies = list( - self.list(roomId=room_id, parentId=parent_id, max=100) - ) - - # Add replies to thread messages, avoiding duplicates - existing_ids = { - getattr(m, "id", None) for m in thread_messages - } - for reply in replies: - reply_id = getattr(reply, "id", None) - if reply_id and reply_id not in existing_ids: - thread_messages.append(reply) - existing_ids.add(reply_id) - - except Exception as e: - if not error_message: - error_message = ( - f"Could not retrieve thread replies: {str(e)}" - ) - - # Strategy 3: Ensure the original message is included - original_id = getattr(message, "id", None) - if original_id and not any( - getattr(m, "id", None) == original_id for m in thread_messages - ): - thread_messages.append(message) - - # Sort messages by creation time (oldest to newest) - thread_messages.sort(key=lambda m: getattr(m, "created", "")) - - except Exception as e: - error_message = f"Unexpected error retrieving thread: {str(e)}" - - return thread_messages, root_message, error_message - - def get_thread_context(self, message, max_scan=500): - """Get thread context including root message and all replies. - - This is a convenience method that returns a structured result with - thread information, making it easy to work with thread data. - - Args: - message: The message object to get thread context for - max_scan (int): Maximum number of messages to scan when searching for parent - - Returns: - dict: Dictionary containing: - - "thread_messages": List of all messages in thread (oldest to newest) - - "root_message": The root message of the thread - - "reply_count": Number of replies in the thread - - "is_thread": Boolean indicating if this is a threaded conversation - - "error": Error message if any issues occurred - - "room_type": Type of room (direct/group) - """ - thread_messages, root_message, error = self.get_thread_messages( - message, max_scan - ) - - return { - "thread_messages": thread_messages, - "root_message": root_message, - "reply_count": len(thread_messages) - 1 if root_message else 0, - "is_thread": getattr(message, "parentId", None) is not None, - "error": error, - "room_type": getattr(message, "roomType", "unknown"), - } diff --git a/src/webexpythonsdk/thread_utils.py b/src/webexpythonsdk/thread_utils.py deleted file mode 100644 index 97ade5d..0000000 --- a/src/webexpythonsdk/thread_utils.py +++ /dev/null @@ -1,179 +0,0 @@ -"""Thread utility functions for Webex Python SDK. - -This module provides utilities for working with threaded conversations in both -1:1 conversations and spaces, handling the different permission models and -API limitations. - -Copyright (c) 2016-2024 Cisco and/or its affiliates. - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -""" - - -def collect_thread_text_and_attachments( - api, msg, max_scan=500, max_chars=60000 -): - """Robustly collect thread text + attachments for both 1:1 and spaces. - - This function provides a robust way to collect thread messages that works - for both 1:1 conversations and spaces, handling the different permission - models and API limitations described in issue #256. - - Strategy: - 1) Use the new thread-aware API methods - 2) Handle both direct and group room types appropriately - 3) Provide fallback mechanisms when direct retrieval fails - 4) Always include replies ordered oldest->newest - 5) Ensure the incoming message 'msg' is present - 6) If starter can't be found, add a placeholder notice - - Args: - api: WebexAPI instance with messages API - msg: The message object to collect thread for - max_scan (int): Maximum number of messages to scan when searching for parent - max_chars (int): Maximum characters for text content before truncation - - Returns: - tuple: (thread_text, [attachment_text]) where attachment_text is list with single big string - """ - author_cache = {} - thread_text_lines = [] - attachment_blocks = [] - - def process_single_message(m): - """Process a single message and extract text and attachments.""" - author = get_display_name( - getattr(m, "personId", "unknown"), author_cache - ) - mtext = (getattr(m, "text", "") or "").strip() - if mtext: - thread_text_lines.append(f"[{author}]: {mtext}") - - files = getattr(m, "files", None) - if files and hasattr(files, "__iter__") and not isinstance(files, str): - for f_url in files: - try: - content, fname, ctype = download_webex_file(f_url) - extracted = extract_text_from_file(content, fname, ctype) - attachment_blocks.append( - f"[Attachment {fname}]:\n{extracted}" - ) - except Exception as e: - # keep going; record the error in attachments so user sees it - attachment_blocks.append( - f"[Attachment error for {fname}]: {e}" - ) - - # Use the new thread-aware API method - try: - thread_context = api.messages.get_thread_context(msg, max_scan) - thread_messages = thread_context["thread_messages"] - root_message = thread_context["root_message"] - error = thread_context["error"] - - # Add error notice if we couldn't retrieve the root message - if error and not root_message: - thread_text_lines.append(f"[Thread retrieval error]: {error}") - thread_text_lines.append( - "[Starter message unavailable — bot may have joined after the thread started or lacks permission to read the original message.]" - ) - - except Exception as e: - # Fallback to processing just the single message - thread_messages = [msg] - root_message = None - error = f"Failed to retrieve thread context: {str(e)}" - thread_text_lines.append(f"[Thread retrieval error]: {error}") - - # Process all messages in the thread - seen_ids = set() - for m in thread_messages: - mid = getattr(m, "id", None) - if mid and mid in seen_ids: - continue - if mid: - seen_ids.add(mid) - process_single_message(m) - - # Combine and apply size limits - thread_text = "\n".join(thread_text_lines) - if len(thread_text) > max_chars: - thread_text = thread_text[:max_chars] + "\n...[truncated]" - - att_text = "\n\n".join(attachment_blocks) - if len(att_text) > max_chars: - att_text = att_text[:max_chars] + "\n...[attachments truncated]" - - return thread_text, [att_text] if att_text else [] - - -def get_display_name(person_id, author_cache): - """Get display name for a person ID with caching. - - This is a placeholder function. In a real implementation, you would - use the People API to get the display name. - - Args: - person_id: The person ID to get display name for - author_cache: Cache dictionary to store results - - Returns: - str: Display name or person ID if not found - """ - if person_id in author_cache: - return author_cache[person_id] - - # Placeholder implementation - in real usage, call People API - display_name = f"User-{person_id[:8]}" - author_cache[person_id] = display_name - return display_name - - -def download_webex_file(file_url): - """Download a file from Webex. - - This is a placeholder function. In a real implementation, you would - download the file from the provided URL. - - Args: - file_url: URL of the file to download - - Returns: - tuple: (content, filename, content_type) - """ - # Placeholder implementation - in real usage, download the file - return b"", "placeholder.txt", "text/plain" - - -def extract_text_from_file(content, filename, content_type): - """Extract text content from a file. - - This is a placeholder function. In a real implementation, you would - extract text based on the file type. - - Args: - content: File content as bytes - filename: Name of the file - content_type: MIME type of the file - - Returns: - str: Extracted text content - """ - # Placeholder implementation - in real usage, extract text based on file type - return f"Text content from {filename} (type: {content_type})" diff --git a/tests/api/test_messages.py b/tests/api/test_messages.py index 06b5cfc..7b82310 100644 --- a/tests/api/test_messages.py +++ b/tests/api/test_messages.py @@ -148,7 +148,7 @@ def group_room_text_message(group_room, send_group_room_message): def group_room_message_reply_by_id(api, group_room, group_room_text_message): text = create_string("Reply Message") return api.messages.create( - roomId=group_room.id, + # roomId=group_room.id, parentId=group_room_text_message.id, text=text, ) @@ -384,256 +384,3 @@ def test_update_message(api, group_room): message = api.messages.create(group_room.id, text=text) text = create_string("Message Updated") assert text == api.messages.edit(message.id, group_room.id, text).text - - -# Thread-aware message retrieval tests -def test_is_direct_room_detection(api, direct_message_by_person_id): - """Test room type detection for direct messages.""" - assert api.messages._is_direct_room(direct_message_by_person_id) is True - assert api.messages._is_group_room(direct_message_by_person_id) is False - - -def test_is_group_room_detection(api, group_room_text_message): - """Test room type detection for group room messages.""" - assert api.messages._is_group_room(group_room_text_message) is True - assert api.messages._is_direct_room(group_room_text_message) is False - - -def test_get_thread_context_direct_message(api, direct_message_reply_by_person_id): - """Test thread context retrieval for direct message threads.""" - context = api.messages.get_thread_context(direct_message_reply_by_person_id) - - assert context["is_thread"] is True - assert context["room_type"] == "direct" - assert context["reply_count"] >= 0 - assert isinstance(context["thread_messages"], list) - assert len(context["thread_messages"]) >= 1 - - # The original message should be in the thread - message_ids = [msg.id for msg in context["thread_messages"]] - assert direct_message_reply_by_person_id.id in message_ids - - -def test_get_thread_context_group_message(api, group_room_message_reply_by_id): - """Test thread context retrieval for group room message threads.""" - context = api.messages.get_thread_context(group_room_message_reply_by_id) - - assert context["is_thread"] is True - assert context["room_type"] == "group" - assert context["reply_count"] >= 0 - assert isinstance(context["thread_messages"], list) - assert len(context["thread_messages"]) >= 1 - - # The original message should be in the thread - message_ids = [msg.id for msg in context["thread_messages"]] - assert group_room_message_reply_by_id.id in message_ids - - -def test_get_thread_context_single_message(api, group_room_text_message): - """Test thread context for non-threaded messages.""" - context = api.messages.get_thread_context(group_room_text_message) - - assert context["is_thread"] is False - assert context["room_type"] == "group" - assert context["reply_count"] == 0 - assert len(context["thread_messages"]) == 1 - assert context["thread_messages"][0].id == group_room_text_message.id - - -def test_get_thread_messages_direct(api, direct_message_reply_by_person_id): - """Test thread message retrieval for direct messages.""" - thread_messages, root_message, error = api.messages.get_thread_messages( - direct_message_reply_by_person_id - ) - - assert isinstance(thread_messages, list) - assert len(thread_messages) >= 1 - assert error is None or isinstance(error, str) - - # The original message should be in the thread - message_ids = [msg.id for msg in thread_messages] - assert direct_message_reply_by_person_id.id in message_ids - - -def test_get_thread_messages_group(api, group_room_message_reply_by_id): - """Test thread message retrieval for group room messages.""" - thread_messages, root_message, error = api.messages.get_thread_messages( - group_room_message_reply_by_id - ) - - assert isinstance(thread_messages, list) - assert len(thread_messages) >= 1 - assert error is None or isinstance(error, str) - - # The original message should be in the thread - message_ids = [msg.id for msg in thread_messages] - assert group_room_message_reply_by_id.id in message_ids - - -def test_get_thread_messages_single_message(api, group_room_text_message): - """Test thread message retrieval for non-threaded messages.""" - thread_messages, root_message, error = api.messages.get_thread_messages( - group_room_text_message - ) - - assert isinstance(thread_messages, list) - assert len(thread_messages) == 1 - assert thread_messages[0].id == group_room_text_message.id - assert root_message is None - assert error is None - - -def test_thread_context_error_handling(api): - """Test error handling in thread context retrieval.""" - # Create a mock message with invalid data to test error handling - class MockMessage: - def __init__(self): - self.id = "invalid_message_id" - self.parentId = "invalid_parent_id" - self.roomId = "invalid_room_id" - self.roomType = "group" - self.created = "2024-01-01T10:00:00Z" - - mock_msg = MockMessage() - context = api.messages.get_thread_context(mock_msg) - - # Should handle errors gracefully - assert isinstance(context, dict) - assert "error" in context - assert "thread_messages" in context - assert "room_type" in context - assert context["room_type"] == "group" - - -def test_thread_messages_error_handling(api): - """Test error handling in thread message retrieval.""" - # Create a mock message with invalid data to test error handling - class MockMessage: - def __init__(self): - self.id = "invalid_message_id" - self.parentId = "invalid_parent_id" - self.roomId = "invalid_room_id" - self.roomType = "group" - self.created = "2024-01-01T10:00:00Z" - - mock_msg = MockMessage() - thread_messages, root_message, error = api.messages.get_thread_messages(mock_msg) - - # Should handle errors gracefully - assert isinstance(thread_messages, list) - assert isinstance(error, str) or error is None - assert root_message is None or isinstance(root_message, object) - - -def test_thread_context_room_type_consistency(api, direct_message_by_person_id, group_room_text_message): - """Test that room type detection is consistent across different message types.""" - # Test direct message - direct_context = api.messages.get_thread_context(direct_message_by_person_id) - assert direct_context["room_type"] == "direct" - - # Test group message - group_context = api.messages.get_thread_context(group_room_text_message) - assert group_context["room_type"] == "group" - - -def test_thread_messages_ordering(api, group_room_message_reply_by_id): - """Test that thread messages are returned in chronological order.""" - thread_messages, root_message, error = api.messages.get_thread_messages( - group_room_message_reply_by_id - ) - - if len(thread_messages) > 1: - # Messages should be ordered by creation time (oldest to newest) - for i in range(len(thread_messages) - 1): - current_created = getattr(thread_messages[i], "created", "") - next_created = getattr(thread_messages[i + 1], "created", "") - if current_created and next_created: - assert current_created <= next_created - - -def test_thread_context_with_max_scan_limit(api, group_room_message_reply_by_id): - """Test thread context with custom max_scan parameter.""" - # Test with a very small max_scan to ensure the parameter is respected - context = api.messages.get_thread_context(group_room_message_reply_by_id, max_scan=1) - - assert isinstance(context, dict) - assert "thread_messages" in context - assert "room_type" in context - assert context["room_type"] == "group" - - -def test_thread_messages_with_max_scan_limit(api, group_room_message_reply_by_id): - """Test thread messages with custom max_scan parameter.""" - # Test with a very small max_scan to ensure the parameter is respected - thread_messages, root_message, error = api.messages.get_thread_messages( - group_room_message_reply_by_id, max_scan=1 - ) - - assert isinstance(thread_messages, list) - assert isinstance(error, str) or error is None - - -def test_collect_thread_text_and_attachments_utility(api, group_room_message_reply_by_id): - """Test the collect_thread_text_and_attachments utility function with real data.""" - from webexpythonsdk.thread_utils import collect_thread_text_and_attachments - - thread_text, attachments = collect_thread_text_and_attachments(api, group_room_message_reply_by_id) - - # Verify return types - assert isinstance(thread_text, str) - assert isinstance(attachments, list) - assert len(attachments) >= 0 - - # Verify thread text contains the message content - assert len(thread_text) > 0 - - # Verify attachments is a list of strings (or empty) - for attachment in attachments: - assert isinstance(attachment, str) - - -def test_collect_thread_text_and_attachments_direct_message(api, direct_message_reply_by_person_id): - """Test the collect_thread_text_and_attachments utility function with direct messages.""" - from webexpythonsdk.thread_utils import collect_thread_text_and_attachments - - thread_text, attachments = collect_thread_text_and_attachments(api, direct_message_reply_by_person_id) - - # Verify return types - assert isinstance(thread_text, str) - assert isinstance(attachments, list) - assert len(attachments) >= 0 - - # Verify thread text contains the message content - assert len(thread_text) > 0 - - -def test_collect_thread_text_and_attachments_single_message(api, group_room_text_message): - """Test the collect_thread_text_and_attachments utility function with single messages.""" - from webexpythonsdk.thread_utils import collect_thread_text_and_attachments - - thread_text, attachments = collect_thread_text_and_attachments(api, group_room_text_message) - - # Verify return types - assert isinstance(thread_text, str) - assert isinstance(attachments, list) - assert len(attachments) >= 0 - - # Verify thread text contains the message content - assert len(thread_text) > 0 - - -def test_collect_thread_text_and_attachments_with_custom_limits(api, group_room_message_reply_by_id): - """Test the collect_thread_text_and_attachments utility function with custom parameters.""" - from webexpythonsdk.thread_utils import collect_thread_text_and_attachments - - # Test with custom max_scan and max_chars - thread_text, attachments = collect_thread_text_and_attachments( - api, group_room_message_reply_by_id, max_scan=10, max_chars=1000 - ) - - # Verify return types - assert isinstance(thread_text, str) - assert isinstance(attachments, list) - - # Verify max_chars limit is respected - assert len(thread_text) <= 1000