Skip to content

iomanager

This module defines the IoManager class which manages I/O for file objects connected to an existing gdb process or pty.

IoManager

Source code in pygdbmi/IoManager.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class IoManager:
    def __init__(
        self,
        stdin: io.BufferedWriter,
        stdout: io.BufferedReader,
        stderr: Optional[io.BufferedReader],
        time_to_check_for_additional_output_sec=DEFAULT_TIME_TO_CHECK_FOR_ADDITIONAL_OUTPUT_SEC,
    ):
        """
        Manage I/O for file objects created before calling this class
        This can be useful if the gdb process is managed elsewhere, or if a
        pty is used.
        """

        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr

        self.stdin_fileno = self.stdin.fileno()
        self.stdout_fileno = self.stdout.fileno()
        self.stderr_fileno = self.stderr.fileno() if self.stderr else -1

        self.read_list: List[int] = []
        if self.stdout:
            self.read_list.append(self.stdout_fileno)
        self.write_list = [self.stdin_fileno]

        self._incomplete_output: Dict[str, Any] = {"stdout": None, "stderr": None}
        self.time_to_check_for_additional_output_sec = (
            time_to_check_for_additional_output_sec
        )
        self._allow_overwrite_timeout_times = (
            self.time_to_check_for_additional_output_sec > 0
        )
        make_non_blocking(self.stdout)
        if self.stderr:
            make_non_blocking(self.stderr)

    def get_gdb_response(
        self, timeout_sec: float = DEFAULT_GDB_TIMEOUT_SEC, raise_error_on_timeout=True
    ):
        """Get response from GDB, and block while doing so. If GDB does not have any response ready to be read
        by timeout_sec, an exception is raised.

        Args:
            timeout_sec: Maximum time to wait for reponse. Must be >= 0. Will return after
            raise_error_on_timeout: Whether an exception should be raised if no response was found after timeout_sec

        Returns:
            List of parsed GDB responses, returned from gdbmiparser.parse_response, with the
            additional key 'stream' which is either 'stdout' or 'stderr'

        Raises:
            GdbTimeoutError: if response is not received within timeout_sec
            ValueError: if select returned unexpected file number
        """

        if timeout_sec < 0:
            logger.warning("timeout_sec was negative, replacing with 0")
            timeout_sec = 0

        if USING_WINDOWS:
            retval = self._get_responses_windows(timeout_sec)
        else:
            retval = self._get_responses_unix(timeout_sec)

        if not retval and raise_error_on_timeout:
            raise GdbTimeoutError(
                "Did not get response from gdb after %s seconds" % timeout_sec
            )

        else:
            return retval

    def _get_responses_windows(self, timeout_sec):
        """Get responses on windows. Assume no support for select and use a while loop."""
        timeout_time_sec = time.time() + timeout_sec
        responses = []
        while True:
            responses_list = []
            try:
                self.stdout.flush()
                raw_output = self.stdout.readline().replace(b"\r", b"\n")
                responses_list = self._get_responses_list(raw_output, "stdout")
            except IOError:
                pass

            try:
                self.stderr.flush()
                raw_output = self.stderr.readline().replace(b"\r", b"\n")
                responses_list += self._get_responses_list(raw_output, "stderr")
            except IOError:
                pass

            responses += responses_list
            if timeout_sec == 0:
                break
            elif responses_list and self._allow_overwrite_timeout_times:
                timeout_time_sec = min(
                    time.time() + self.time_to_check_for_additional_output_sec,
                    timeout_time_sec,
                )
            elif time.time() > timeout_time_sec:
                break

        return responses

    def _get_responses_unix(self, timeout_sec):
        """Get responses on unix-like system. Use select to wait for output."""
        timeout_time_sec = time.time() + timeout_sec
        responses = []
        while True:
            select_timeout = timeout_time_sec - time.time()
            if select_timeout <= 0:
                select_timeout = 0
            events, _, _ = select.select(self.read_list, [], [], select_timeout)
            responses_list = None  # to avoid infinite loop if using Python 2
            for fileno in events:
                # new data is ready to read
                if fileno == self.stdout_fileno:
                    self.stdout.flush()
                    raw_output = self.stdout.read()
                    stream = "stdout"

                elif fileno == self.stderr_fileno:
                    self.stderr.flush()
                    raw_output = self.stderr.read()
                    stream = "stderr"

                else:
                    raise ValueError(
                        "Developer error. Got unexpected file number %d" % fileno
                    )
                responses_list = self._get_responses_list(raw_output, stream)
                responses += responses_list

            if timeout_sec == 0:  # just exit immediately
                break

            elif responses_list and self._allow_overwrite_timeout_times:
                # update timeout time to potentially be closer to now to avoid lengthy wait times when nothing is being output by gdb
                timeout_time_sec = min(
                    time.time() + self.time_to_check_for_additional_output_sec,
                    timeout_time_sec,
                )

            elif time.time() > timeout_time_sec:
                break

        return responses

    def _get_responses_list(
        self, raw_output: bytes, stream: str
    ) -> List[Dict[Any, Any]]:
        """Get parsed response list from string output
        Args:
            raw_output (unicode): gdb output to parse
            stream (str): either stdout or stderr
        """
        responses: List[Dict[Any, Any]] = []

        (_new_output, self._incomplete_output[stream],) = _buffer_incomplete_responses(
            raw_output, self._incomplete_output.get(stream)
        )

        if not _new_output:
            return responses

        response_list = list(
            filter(lambda x: x, _new_output.decode(errors="replace").split("\n"))
        )  # remove blank lines

        # parse each response from gdb into a dict, and store in a list
        for response in response_list:
            if gdbmiparser.response_is_finished(response):
                pass
            else:
                parsed_response = gdbmiparser.parse_response(response)
                parsed_response["stream"] = stream

                logger.debug("%s", pformat(parsed_response))

                responses.append(parsed_response)

        return responses

    def write(
        self,
        mi_cmd_to_write: Union[str, List[str]],
        timeout_sec=DEFAULT_GDB_TIMEOUT_SEC,
        raise_error_on_timeout: bool = True,
        read_response: bool = True,
    ):
        """Write to gdb process. Block while parsing responses from gdb for a maximum of timeout_sec.

        Args:
            mi_cmd_to_write: String to write to gdb. If list, it is joined by newlines.
            timeout_sec: Maximum number of seconds to wait for response before exiting. Must be >= 0.
            raise_error_on_timeout: If read_response is True, raise error if no response is received
            read_response: Block and read response. If there is a separate thread running, this can be false, and the reading thread read the output.
        Returns:
            List of parsed gdb responses if read_response is True, otherwise []
        Raises:
            TypeError: if mi_cmd_to_write is not valid
        """
        # self.verify_valid_gdb_subprocess()
        if timeout_sec < 0:
            logger.warning("timeout_sec was negative, replacing with 0")
            timeout_sec = 0

        # Ensure proper type of the mi command
        if isinstance(mi_cmd_to_write, str):
            mi_cmd_to_write_str = mi_cmd_to_write
        elif isinstance(mi_cmd_to_write, list):
            mi_cmd_to_write_str = "\n".join(mi_cmd_to_write)
        else:
            raise TypeError(
                "The gdb mi command must a be str or list. Got "
                + str(type(mi_cmd_to_write))
            )

        logger.debug("writing: %s", mi_cmd_to_write)

        if not mi_cmd_to_write_str.endswith("\n"):
            mi_cmd_to_write_nl = mi_cmd_to_write_str + "\n"
        else:
            mi_cmd_to_write_nl = mi_cmd_to_write_str

        if USING_WINDOWS:
            # select not implemented in windows for pipes
            # assume it's always ready
            outputready = [self.stdin_fileno]
        else:
            _, outputready, _ = select.select([], self.write_list, [], timeout_sec)
        for fileno in outputready:
            if fileno == self.stdin_fileno:
                # ready to write
                self.stdin.write(mi_cmd_to_write_nl.encode())  # type: ignore
                # must flush, otherwise gdb won't realize there is data
                # to evaluate, and we won't get a response
                self.stdin.flush()  # type: ignore
            else:
                logger.error("got unexpected fileno %d" % fileno)

        if read_response is True:
            return self.get_gdb_response(
                timeout_sec=timeout_sec, raise_error_on_timeout=raise_error_on_timeout
            )

        else:
            return []

__init__(stdin, stdout, stderr, time_to_check_for_additional_output_sec=DEFAULT_TIME_TO_CHECK_FOR_ADDITIONAL_OUTPUT_SEC)

Manage I/O for file objects created before calling this class This can be useful if the gdb process is managed elsewhere, or if a pty is used.

Source code in pygdbmi/IoManager.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    stdin: io.BufferedWriter,
    stdout: io.BufferedReader,
    stderr: Optional[io.BufferedReader],
    time_to_check_for_additional_output_sec=DEFAULT_TIME_TO_CHECK_FOR_ADDITIONAL_OUTPUT_SEC,
):
    """
    Manage I/O for file objects created before calling this class
    This can be useful if the gdb process is managed elsewhere, or if a
    pty is used.
    """

    self.stdin = stdin
    self.stdout = stdout
    self.stderr = stderr

    self.stdin_fileno = self.stdin.fileno()
    self.stdout_fileno = self.stdout.fileno()
    self.stderr_fileno = self.stderr.fileno() if self.stderr else -1

    self.read_list: List[int] = []
    if self.stdout:
        self.read_list.append(self.stdout_fileno)
    self.write_list = [self.stdin_fileno]

    self._incomplete_output: Dict[str, Any] = {"stdout": None, "stderr": None}
    self.time_to_check_for_additional_output_sec = (
        time_to_check_for_additional_output_sec
    )
    self._allow_overwrite_timeout_times = (
        self.time_to_check_for_additional_output_sec > 0
    )
    make_non_blocking(self.stdout)
    if self.stderr:
        make_non_blocking(self.stderr)

get_gdb_response(timeout_sec=DEFAULT_GDB_TIMEOUT_SEC, raise_error_on_timeout=True)

Get response from GDB, and block while doing so. If GDB does not have any response ready to be read by timeout_sec, an exception is raised.

Parameters:

Name Type Description Default
timeout_sec float

Maximum time to wait for reponse. Must be >= 0. Will return after

DEFAULT_GDB_TIMEOUT_SEC
raise_error_on_timeout

Whether an exception should be raised if no response was found after timeout_sec

True

Returns:

Type Description

List of parsed GDB responses, returned from gdbmiparser.parse_response, with the

additional key 'stream' which is either 'stdout' or 'stderr'

Raises:

Type Description
GdbTimeoutError

if response is not received within timeout_sec

ValueError

if select returned unexpected file number

Source code in pygdbmi/IoManager.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get_gdb_response(
    self, timeout_sec: float = DEFAULT_GDB_TIMEOUT_SEC, raise_error_on_timeout=True
):
    """Get response from GDB, and block while doing so. If GDB does not have any response ready to be read
    by timeout_sec, an exception is raised.

    Args:
        timeout_sec: Maximum time to wait for reponse. Must be >= 0. Will return after
        raise_error_on_timeout: Whether an exception should be raised if no response was found after timeout_sec

    Returns:
        List of parsed GDB responses, returned from gdbmiparser.parse_response, with the
        additional key 'stream' which is either 'stdout' or 'stderr'

    Raises:
        GdbTimeoutError: if response is not received within timeout_sec
        ValueError: if select returned unexpected file number
    """

    if timeout_sec < 0:
        logger.warning("timeout_sec was negative, replacing with 0")
        timeout_sec = 0

    if USING_WINDOWS:
        retval = self._get_responses_windows(timeout_sec)
    else:
        retval = self._get_responses_unix(timeout_sec)

    if not retval and raise_error_on_timeout:
        raise GdbTimeoutError(
            "Did not get response from gdb after %s seconds" % timeout_sec
        )

    else:
        return retval

write(mi_cmd_to_write, timeout_sec=DEFAULT_GDB_TIMEOUT_SEC, raise_error_on_timeout=True, read_response=True)

Write to gdb process. Block while parsing responses from gdb for a maximum of timeout_sec.

Parameters:

Name Type Description Default
mi_cmd_to_write Union[str, List[str]]

String to write to gdb. If list, it is joined by newlines.

required
timeout_sec

Maximum number of seconds to wait for response before exiting. Must be >= 0.

DEFAULT_GDB_TIMEOUT_SEC
raise_error_on_timeout bool

If read_response is True, raise error if no response is received

True
read_response bool

Block and read response. If there is a separate thread running, this can be false, and the reading thread read the output.

True

Returns:

Type Description

List of parsed gdb responses if read_response is True, otherwise []

Raises:

Type Description
TypeError

if mi_cmd_to_write is not valid

Source code in pygdbmi/IoManager.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def write(
    self,
    mi_cmd_to_write: Union[str, List[str]],
    timeout_sec=DEFAULT_GDB_TIMEOUT_SEC,
    raise_error_on_timeout: bool = True,
    read_response: bool = True,
):
    """Write to gdb process. Block while parsing responses from gdb for a maximum of timeout_sec.

    Args:
        mi_cmd_to_write: String to write to gdb. If list, it is joined by newlines.
        timeout_sec: Maximum number of seconds to wait for response before exiting. Must be >= 0.
        raise_error_on_timeout: If read_response is True, raise error if no response is received
        read_response: Block and read response. If there is a separate thread running, this can be false, and the reading thread read the output.
    Returns:
        List of parsed gdb responses if read_response is True, otherwise []
    Raises:
        TypeError: if mi_cmd_to_write is not valid
    """
    # self.verify_valid_gdb_subprocess()
    if timeout_sec < 0:
        logger.warning("timeout_sec was negative, replacing with 0")
        timeout_sec = 0

    # Ensure proper type of the mi command
    if isinstance(mi_cmd_to_write, str):
        mi_cmd_to_write_str = mi_cmd_to_write
    elif isinstance(mi_cmd_to_write, list):
        mi_cmd_to_write_str = "\n".join(mi_cmd_to_write)
    else:
        raise TypeError(
            "The gdb mi command must a be str or list. Got "
            + str(type(mi_cmd_to_write))
        )

    logger.debug("writing: %s", mi_cmd_to_write)

    if not mi_cmd_to_write_str.endswith("\n"):
        mi_cmd_to_write_nl = mi_cmd_to_write_str + "\n"
    else:
        mi_cmd_to_write_nl = mi_cmd_to_write_str

    if USING_WINDOWS:
        # select not implemented in windows for pipes
        # assume it's always ready
        outputready = [self.stdin_fileno]
    else:
        _, outputready, _ = select.select([], self.write_list, [], timeout_sec)
    for fileno in outputready:
        if fileno == self.stdin_fileno:
            # ready to write
            self.stdin.write(mi_cmd_to_write_nl.encode())  # type: ignore
            # must flush, otherwise gdb won't realize there is data
            # to evaluate, and we won't get a response
            self.stdin.flush()  # type: ignore
        else:
            logger.error("got unexpected fileno %d" % fileno)

    if read_response is True:
        return self.get_gdb_response(
            timeout_sec=timeout_sec, raise_error_on_timeout=raise_error_on_timeout
        )

    else:
        return []

make_non_blocking(file_obj)

make file object non-blocking Windows doesn't have the fcntl module, but someone on stack overflow supplied this code as an answer, and it works http://stackoverflow.com/a/34504971/2893090

Source code in pygdbmi/IoManager.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def make_non_blocking(file_obj: io.IOBase):
    """make file object non-blocking
    Windows doesn't have the fcntl module, but someone on
    stack overflow supplied this code as an answer, and it works
    http://stackoverflow.com/a/34504971/2893090"""

    if USING_WINDOWS:
        LPDWORD = POINTER(DWORD)
        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
        SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
        SetNamedPipeHandleState.restype = BOOL

        h = msvcrt.get_osfhandle(file_obj.fileno())  # type: ignore

        res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
        if res == 0:
            raise ValueError(WinError())

    else:
        # Set the file status flag (F_SETFL) on the pipes to be non-blocking
        # so we can attempt to read from a pipe with no new data without locking
        # the program up
        fcntl.fcntl(file_obj, fcntl.F_SETFL, os.O_NONBLOCK)